001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.File; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.io.UncheckedIOException; 029import java.lang.reflect.Field; 030import java.lang.reflect.Modifier; 031import java.net.BindException; 032import java.net.DatagramSocket; 033import java.net.InetAddress; 034import java.net.ServerSocket; 035import java.net.Socket; 036import java.net.UnknownHostException; 037import java.nio.charset.StandardCharsets; 038import java.security.MessageDigest; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.HashSet; 044import java.util.Iterator; 045import java.util.List; 046import java.util.Map; 047import java.util.NavigableSet; 048import java.util.Properties; 049import java.util.Random; 050import java.util.Set; 051import java.util.TreeSet; 052import java.util.concurrent.ExecutionException; 053import java.util.concurrent.ThreadLocalRandom; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicReference; 056import java.util.function.BooleanSupplier; 057import org.apache.commons.io.FileUtils; 058import org.apache.commons.lang3.RandomStringUtils; 059import org.apache.hadoop.conf.Configuration; 060import org.apache.hadoop.fs.FileSystem; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 063import org.apache.hadoop.hbase.Waiter.Predicate; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.AsyncAdmin; 066import org.apache.hadoop.hbase.client.AsyncClusterConnection; 067import org.apache.hadoop.hbase.client.BufferedMutator; 068import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.Connection; 072import org.apache.hadoop.hbase.client.ConnectionFactory; 073import org.apache.hadoop.hbase.client.Consistency; 074import org.apache.hadoop.hbase.client.Delete; 075import org.apache.hadoop.hbase.client.Durability; 076import org.apache.hadoop.hbase.client.Get; 077import org.apache.hadoop.hbase.client.Hbck; 078import org.apache.hadoop.hbase.client.MasterRegistry; 079import org.apache.hadoop.hbase.client.Put; 080import org.apache.hadoop.hbase.client.RegionInfo; 081import org.apache.hadoop.hbase.client.RegionInfoBuilder; 082import org.apache.hadoop.hbase.client.RegionLocator; 083import org.apache.hadoop.hbase.client.Result; 084import org.apache.hadoop.hbase.client.ResultScanner; 085import org.apache.hadoop.hbase.client.Scan; 086import org.apache.hadoop.hbase.client.Scan.ReadType; 087import org.apache.hadoop.hbase.client.Table; 088import org.apache.hadoop.hbase.client.TableDescriptor; 089import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 090import org.apache.hadoop.hbase.client.TableState; 091import org.apache.hadoop.hbase.fs.HFileSystem; 092import org.apache.hadoop.hbase.io.compress.Compression; 093import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 095import org.apache.hadoop.hbase.io.hfile.BlockCache; 096import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 097import org.apache.hadoop.hbase.io.hfile.HFile; 098import org.apache.hadoop.hbase.ipc.RpcServerInterface; 099import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 100import org.apache.hadoop.hbase.master.HMaster; 101import org.apache.hadoop.hbase.master.RegionState; 102import org.apache.hadoop.hbase.master.ServerManager; 103import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 104import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 105import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 106import org.apache.hadoop.hbase.master.assignment.RegionStates; 107import org.apache.hadoop.hbase.mob.MobFileCache; 108import org.apache.hadoop.hbase.regionserver.BloomType; 109import org.apache.hadoop.hbase.regionserver.ChunkCreator; 110import org.apache.hadoop.hbase.regionserver.HRegion; 111import org.apache.hadoop.hbase.regionserver.HRegionServer; 112import org.apache.hadoop.hbase.regionserver.HStore; 113import org.apache.hadoop.hbase.regionserver.InternalScanner; 114import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 115import org.apache.hadoop.hbase.regionserver.Region; 116import org.apache.hadoop.hbase.regionserver.RegionScanner; 117import org.apache.hadoop.hbase.regionserver.RegionServerServices; 118import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 119import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 120import org.apache.hadoop.hbase.security.User; 121import org.apache.hadoop.hbase.security.UserProvider; 122import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 123import org.apache.hadoop.hbase.util.Bytes; 124import org.apache.hadoop.hbase.util.CommonFSUtils; 125import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 126import org.apache.hadoop.hbase.util.FSUtils; 127import org.apache.hadoop.hbase.util.JVM; 128import org.apache.hadoop.hbase.util.JVMClusterUtil; 129import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 130import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 131import org.apache.hadoop.hbase.util.Pair; 132import org.apache.hadoop.hbase.util.ReflectionUtils; 133import org.apache.hadoop.hbase.util.RegionSplitter; 134import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm; 135import org.apache.hadoop.hbase.util.RetryCounter; 136import org.apache.hadoop.hbase.util.Threads; 137import org.apache.hadoop.hbase.wal.WAL; 138import org.apache.hadoop.hbase.wal.WALFactory; 139import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 140import org.apache.hadoop.hbase.zookeeper.ZKConfig; 141import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 142import org.apache.hadoop.hdfs.DFSClient; 143import org.apache.hadoop.hdfs.DistributedFileSystem; 144import org.apache.hadoop.hdfs.MiniDFSCluster; 145import org.apache.hadoop.hdfs.server.datanode.DataNode; 146import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 147import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 148import org.apache.hadoop.mapred.JobConf; 149import org.apache.hadoop.mapred.MiniMRCluster; 150import org.apache.hadoop.mapred.TaskLog; 151import org.apache.hadoop.minikdc.MiniKdc; 152import org.apache.yetus.audience.InterfaceAudience; 153import org.apache.yetus.audience.InterfaceStability; 154import org.apache.zookeeper.WatchedEvent; 155import org.apache.zookeeper.ZooKeeper; 156import org.apache.zookeeper.ZooKeeper.States; 157 158import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 159 160import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 161 162/** 163 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 164 * functionality. Create an instance and keep it around testing HBase. 165 * <p/> 166 * This class is meant to be your one-stop shop for anything you might need testing. Manages one 167 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster}, 168 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real 169 * cluster. 170 * <p/> 171 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration. 172 * <p/> 173 * It does not set logging levels. 174 * <p/> 175 * In the configuration properties, default values for master-info-port and region-server-port are 176 * overridden such that a random port will be assigned (thus avoiding port contention if another 177 * local HBase instance is already running). 178 * <p/> 179 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 180 * setting it to true. 181 */ 182@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 183@InterfaceStability.Evolving 184public class HBaseTestingUtil extends HBaseZKTestingUtil { 185 186 public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server"; 187 /** 188 * The default number of regions per regionserver when creating a pre-split table. 189 */ 190 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 191 192 public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table"; 193 public static final boolean PRESPLIT_TEST_TABLE = true; 194 195 private MiniDFSCluster dfsCluster = null; 196 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 197 198 private volatile HBaseClusterInterface hbaseCluster = null; 199 private MiniMRCluster mrCluster = null; 200 201 /** If there is a mini cluster running for this testing utility instance. */ 202 private volatile boolean miniClusterRunning; 203 204 private String hadoopLogDir; 205 206 /** 207 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 208 */ 209 private Path dataTestDirOnTestFS = null; 210 211 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 212 213 /** Filesystem URI used for map-reduce mini-cluster setup */ 214 private static String FS_URI; 215 216 /** This is for unit tests parameterized with a single boolean. */ 217 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 218 219 /** 220 * Checks to see if a specific port is available. 221 * @param port the port number to check for availability 222 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 223 */ 224 public static boolean available(int port) { 225 ServerSocket ss = null; 226 DatagramSocket ds = null; 227 try { 228 ss = new ServerSocket(port); 229 ss.setReuseAddress(true); 230 ds = new DatagramSocket(port); 231 ds.setReuseAddress(true); 232 return true; 233 } catch (IOException e) { 234 // Do nothing 235 } finally { 236 if (ds != null) { 237 ds.close(); 238 } 239 240 if (ss != null) { 241 try { 242 ss.close(); 243 } catch (IOException e) { 244 /* should not be thrown */ 245 } 246 } 247 } 248 249 return false; 250 } 251 252 /** 253 * Create all combinations of Bloom filters and compression algorithms for testing. 254 */ 255 private static List<Object[]> bloomAndCompressionCombinations() { 256 List<Object[]> configurations = new ArrayList<>(); 257 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 258 for (BloomType bloomType : BloomType.values()) { 259 configurations.add(new Object[] { comprAlgo, bloomType }); 260 } 261 } 262 return Collections.unmodifiableList(configurations); 263 } 264 265 /** 266 * Create combination of memstoreTS and tags 267 */ 268 private static List<Object[]> memStoreTSAndTagsCombination() { 269 List<Object[]> configurations = new ArrayList<>(); 270 configurations.add(new Object[] { false, false }); 271 configurations.add(new Object[] { false, true }); 272 configurations.add(new Object[] { true, false }); 273 configurations.add(new Object[] { true, true }); 274 return Collections.unmodifiableList(configurations); 275 } 276 277 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 278 List<Object[]> configurations = new ArrayList<>(); 279 configurations.add(new Object[] { false, false, true }); 280 configurations.add(new Object[] { false, false, false }); 281 configurations.add(new Object[] { false, true, true }); 282 configurations.add(new Object[] { false, true, false }); 283 configurations.add(new Object[] { true, false, true }); 284 configurations.add(new Object[] { true, false, false }); 285 configurations.add(new Object[] { true, true, true }); 286 configurations.add(new Object[] { true, true, false }); 287 return Collections.unmodifiableList(configurations); 288 } 289 290 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 291 bloomAndCompressionCombinations(); 292 293 /** 294 * <p> 295 * Create an HBaseTestingUtility using a default configuration. 296 * <p> 297 * Initially, all tmp files are written to a local test data directory. Once 298 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 299 * data will be written to the DFS directory instead. 300 */ 301 public HBaseTestingUtil() { 302 this(HBaseConfiguration.create()); 303 } 304 305 /** 306 * <p> 307 * Create an HBaseTestingUtility using a given configuration. 308 * <p> 309 * Initially, all tmp files are written to a local test data directory. Once 310 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 311 * data will be written to the DFS directory instead. 312 * @param conf The configuration to use for further operations 313 */ 314 public HBaseTestingUtil(@Nullable Configuration conf) { 315 super(conf); 316 317 // a hbase checksum verification failure will cause unit tests to fail 318 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 319 320 // Save this for when setting default file:// breaks things 321 if (this.conf.get("fs.defaultFS") != null) { 322 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 323 } 324 if (this.conf.get(HConstants.HBASE_DIR) != null) { 325 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 326 } 327 // Every cluster is a local cluster until we start DFS 328 // Note that conf could be null, but this.conf will not be 329 String dataTestDir = getDataTestDir().toString(); 330 this.conf.set("fs.defaultFS", "file:///"); 331 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 332 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 333 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 334 // If the value for random ports isn't set set it to true, thus making 335 // tests opt-out for random port assignment 336 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 337 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 338 } 339 340 /** 341 * Close both the region {@code r} and it's underlying WAL. For use in tests. 342 */ 343 public static void closeRegionAndWAL(final Region r) throws IOException { 344 closeRegionAndWAL((HRegion) r); 345 } 346 347 /** 348 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 349 */ 350 public static void closeRegionAndWAL(final HRegion r) throws IOException { 351 if (r == null) return; 352 r.close(); 353 if (r.getWAL() == null) return; 354 r.getWAL().close(); 355 } 356 357 /** 358 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 359 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 360 * by the Configuration. If say, a Connection was being used against a cluster that had been 361 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 362 * Rather than use the return direct, its usually best to make a copy and use that. Do 363 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 364 * @return Instance of Configuration. 365 */ 366 @Override 367 public Configuration getConfiguration() { 368 return super.getConfiguration(); 369 } 370 371 public void setHBaseCluster(HBaseClusterInterface hbaseCluster) { 372 this.hbaseCluster = hbaseCluster; 373 } 374 375 /** 376 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 377 * have many concurrent tests running if we need to. Moding a System property is not the way to do 378 * concurrent instances -- another instance could grab the temporary value unintentionally -- but 379 * not anything can do about it at moment; single instance only is how the minidfscluster works. 380 * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir 381 * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir 382 * (We do not create them!). 383 * @return The calculated data test build directory, if newly-created. 384 */ 385 @Override 386 protected Path setupDataTestDir() { 387 Path testPath = super.setupDataTestDir(); 388 if (null == testPath) { 389 return null; 390 } 391 392 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 393 394 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 395 // we want our own value to ensure uniqueness on the same machine 396 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 397 398 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 399 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 400 return testPath; 401 } 402 403 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 404 405 String sysValue = System.getProperty(propertyName); 406 407 if (sysValue != null) { 408 // There is already a value set. So we do nothing but hope 409 // that there will be no conflicts 410 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 411 + " so I do NOT create it in " + parent); 412 String confValue = conf.get(propertyName); 413 if (confValue != null && !confValue.endsWith(sysValue)) { 414 LOG.warn(propertyName + " property value differs in configuration and system: " 415 + "Configuration=" + confValue + " while System=" + sysValue 416 + " Erasing configuration value by system value."); 417 } 418 conf.set(propertyName, sysValue); 419 } else { 420 // Ok, it's not set, so we create it as a subdirectory 421 createSubDir(propertyName, parent, subDirName); 422 System.setProperty(propertyName, conf.get(propertyName)); 423 } 424 } 425 426 /** 427 * @return Where to write test data on the test filesystem; Returns working directory for the test 428 * filesystem by default 429 * @see #setupDataTestDirOnTestFS() 430 * @see #getTestFileSystem() 431 */ 432 private Path getBaseTestDirOnTestFS() throws IOException { 433 FileSystem fs = getTestFileSystem(); 434 return new Path(fs.getWorkingDirectory(), "test-data"); 435 } 436 437 /** 438 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 439 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 440 * on it. 441 * @return a unique path in the test filesystem 442 */ 443 public Path getDataTestDirOnTestFS() throws IOException { 444 if (dataTestDirOnTestFS == null) { 445 setupDataTestDirOnTestFS(); 446 } 447 448 return dataTestDirOnTestFS; 449 } 450 451 /** 452 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 453 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 454 * on it. 455 * @return a unique path in the test filesystem 456 * @param subdirName name of the subdir to create under the base test dir 457 */ 458 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 459 return new Path(getDataTestDirOnTestFS(), subdirName); 460 } 461 462 /** 463 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 464 * setup. 465 */ 466 private void setupDataTestDirOnTestFS() throws IOException { 467 if (dataTestDirOnTestFS != null) { 468 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 469 return; 470 } 471 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 472 } 473 474 /** 475 * Sets up a new path in test filesystem to be used by tests. 476 */ 477 private Path getNewDataTestDirOnTestFS() throws IOException { 478 // The file system can be either local, mini dfs, or if the configuration 479 // is supplied externally, it can be an external cluster FS. If it is a local 480 // file system, the tests should use getBaseTestDir, otherwise, we can use 481 // the working directory, and create a unique sub dir there 482 FileSystem fs = getTestFileSystem(); 483 Path newDataTestDir; 484 String randomStr = getRandomUUID().toString(); 485 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 486 newDataTestDir = new Path(getDataTestDir(), randomStr); 487 File dataTestDir = new File(newDataTestDir.toString()); 488 if (deleteOnExit()) dataTestDir.deleteOnExit(); 489 } else { 490 Path base = getBaseTestDirOnTestFS(); 491 newDataTestDir = new Path(base, randomStr); 492 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 493 } 494 return newDataTestDir; 495 } 496 497 /** 498 * Cleans the test data directory on the test filesystem. 499 * @return True if we removed the test dirs 500 */ 501 public boolean cleanupDataTestDirOnTestFS() throws IOException { 502 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 503 if (ret) { 504 dataTestDirOnTestFS = null; 505 } 506 return ret; 507 } 508 509 /** 510 * Cleans a subdirectory under the test data directory on the test filesystem. 511 * @return True if we removed child 512 */ 513 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 514 Path cpath = getDataTestDirOnTestFS(subdirName); 515 return getTestFileSystem().delete(cpath, true); 516 } 517 518 /** 519 * Start a minidfscluster. 520 * @param servers How many DNs to start. 521 * @see #shutdownMiniDFSCluster() 522 * @return The mini dfs cluster created. 523 */ 524 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 525 return startMiniDFSCluster(servers, null); 526 } 527 528 /** 529 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 530 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 531 * instances of the datanodes will have the same host name. 532 * @param hosts hostnames DNs to run on. 533 * @see #shutdownMiniDFSCluster() 534 * @return The mini dfs cluster created. 535 */ 536 public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception { 537 if (hosts != null && hosts.length != 0) { 538 return startMiniDFSCluster(hosts.length, hosts); 539 } else { 540 return startMiniDFSCluster(1, null); 541 } 542 } 543 544 /** 545 * Start a minidfscluster. Can only create one. 546 * @param servers How many DNs to start. 547 * @param hosts hostnames DNs to run on. 548 * @see #shutdownMiniDFSCluster() 549 * @return The mini dfs cluster created. 550 */ 551 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception { 552 return startMiniDFSCluster(servers, null, hosts); 553 } 554 555 private void setFs() throws IOException { 556 if (this.dfsCluster == null) { 557 LOG.info("Skipping setting fs because dfsCluster is null"); 558 return; 559 } 560 FileSystem fs = this.dfsCluster.getFileSystem(); 561 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 562 563 // re-enable this check with dfs 564 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 565 } 566 567 // Workaround to avoid IllegalThreadStateException 568 // See HBASE-27148 for more details 569 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 570 571 private volatile boolean stopped = false; 572 573 private final MiniDFSCluster cluster; 574 575 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 576 super("FsDatasetAsyncDiskServiceFixer"); 577 setDaemon(true); 578 this.cluster = cluster; 579 } 580 581 @Override 582 public void run() { 583 while (!stopped) { 584 try { 585 Thread.sleep(30000); 586 } catch (InterruptedException e) { 587 Thread.currentThread().interrupt(); 588 continue; 589 } 590 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 591 // timeout of the thread pool executor is 60 seconds by default. 592 try { 593 for (DataNode dn : cluster.getDataNodes()) { 594 FsDatasetSpi<?> dataset = dn.getFSDataset(); 595 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 596 service.setAccessible(true); 597 Object asyncDiskService = service.get(dataset); 598 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 599 group.setAccessible(true); 600 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 601 if (threadGroup.isDaemon()) { 602 threadGroup.setDaemon(false); 603 } 604 } 605 } catch (NoSuchFieldException e) { 606 LOG.debug("NoSuchFieldException: " + e.getMessage() 607 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 608 + "See HBASE-27595 for details."); 609 } catch (Exception e) { 610 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 611 } 612 } 613 } 614 615 void shutdown() { 616 stopped = true; 617 interrupt(); 618 } 619 } 620 621 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts) 622 throws Exception { 623 createDirsAndSetProperties(); 624 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 625 626 this.dfsCluster = 627 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 628 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 629 this.dfsClusterFixer.start(); 630 // Set this just-started cluster as our filesystem. 631 setFs(); 632 633 // Wait for the cluster to be totally up 634 this.dfsCluster.waitClusterUp(); 635 636 // reset the test directory for test file system 637 dataTestDirOnTestFS = null; 638 String dataTestDir = getDataTestDir().toString(); 639 conf.set(HConstants.HBASE_DIR, dataTestDir); 640 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 641 642 return this.dfsCluster; 643 } 644 645 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 646 createDirsAndSetProperties(); 647 dfsCluster = 648 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 649 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 650 this.dfsClusterFixer.start(); 651 return dfsCluster; 652 } 653 654 /** 655 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 656 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 657 * the conf. 658 * 659 * <pre> 660 * Configuration conf = TEST_UTIL.getConfiguration(); 661 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 662 * Map.Entry<String, String> e = i.next(); 663 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 664 * } 665 * </pre> 666 */ 667 private void createDirsAndSetProperties() throws IOException { 668 setupClusterTestDir(); 669 conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath()); 670 createDirAndSetProperty("test.cache.data"); 671 createDirAndSetProperty("hadoop.tmp.dir"); 672 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 673 createDirAndSetProperty("mapreduce.cluster.local.dir"); 674 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 675 enableShortCircuit(); 676 677 Path root = getDataTestDirOnTestFS("hadoop"); 678 conf.set(MapreduceTestingShim.getMROutputDirProp(), 679 new Path(root, "mapred-output-dir").toString()); 680 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 681 conf.set("mapreduce.jobtracker.staging.root.dir", 682 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 683 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 684 conf.set("yarn.app.mapreduce.am.staging-dir", 685 new Path(root, "mapreduce-am-staging-root-dir").toString()); 686 687 // Frustrate yarn's and hdfs's attempts at writing /tmp. 688 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 689 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 690 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 691 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 692 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 693 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 694 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 695 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 696 createDirAndSetProperty("dfs.journalnode.edits.dir"); 697 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 698 createDirAndSetProperty("nfs.dump.dir"); 699 createDirAndSetProperty("java.io.tmpdir"); 700 createDirAndSetProperty("dfs.journalnode.edits.dir"); 701 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 702 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 703 } 704 705 /** 706 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 707 * Default to false. 708 */ 709 public boolean isNewVersionBehaviorEnabled() { 710 final String propName = "hbase.tests.new.version.behavior"; 711 String v = System.getProperty(propName); 712 if (v != null) { 713 return Boolean.parseBoolean(v); 714 } 715 return false; 716 } 717 718 /** 719 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 720 * allows to specify this parameter on the command line. If not set, default is true. 721 */ 722 public boolean isReadShortCircuitOn() { 723 final String propName = "hbase.tests.use.shortcircuit.reads"; 724 String readOnProp = System.getProperty(propName); 725 if (readOnProp != null) { 726 return Boolean.parseBoolean(readOnProp); 727 } else { 728 return conf.getBoolean(propName, false); 729 } 730 } 731 732 /** 733 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 734 * including skipping the hdfs checksum checks. 735 */ 736 private void enableShortCircuit() { 737 if (isReadShortCircuitOn()) { 738 String curUser = System.getProperty("user.name"); 739 LOG.info("read short circuit is ON for user " + curUser); 740 // read short circuit, for hdfs 741 conf.set("dfs.block.local-path-access.user", curUser); 742 // read short circuit, for hbase 743 conf.setBoolean("dfs.client.read.shortcircuit", true); 744 // Skip checking checksum, for the hdfs client and the datanode 745 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 746 } else { 747 LOG.info("read short circuit is OFF"); 748 } 749 } 750 751 private String createDirAndSetProperty(final String property) { 752 return createDirAndSetProperty(property, property); 753 } 754 755 private String createDirAndSetProperty(final String relPath, String property) { 756 String path = getDataTestDir(relPath).toString(); 757 System.setProperty(property, path); 758 conf.set(property, path); 759 new File(path).mkdirs(); 760 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 761 return path; 762 } 763 764 /** 765 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 766 */ 767 public void shutdownMiniDFSCluster() throws IOException { 768 if (this.dfsCluster != null) { 769 // The below throws an exception per dn, AsynchronousCloseException. 770 this.dfsCluster.shutdown(); 771 dfsCluster = null; 772 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 773 // have a fixer 774 if (dfsClusterFixer != null) { 775 this.dfsClusterFixer.shutdown(); 776 dfsClusterFixer = null; 777 } 778 dataTestDirOnTestFS = null; 779 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 780 } 781 } 782 783 /** 784 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 785 * other options will use default values, defined in {@link StartTestingClusterOption.Builder}. 786 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 787 * @see #startMiniCluster(StartTestingClusterOption option) 788 * @see #shutdownMiniDFSCluster() 789 */ 790 public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception { 791 StartTestingClusterOption option = StartTestingClusterOption.builder() 792 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 793 return startMiniCluster(option); 794 } 795 796 /** 797 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 798 * value can be found in {@link StartTestingClusterOption.Builder}. 799 * @see #startMiniCluster(StartTestingClusterOption option) 800 * @see #shutdownMiniDFSCluster() 801 */ 802 public SingleProcessHBaseCluster startMiniCluster() throws Exception { 803 return startMiniCluster(StartTestingClusterOption.builder().build()); 804 } 805 806 /** 807 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 808 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 809 * under System property test.build.data, to be cleaned up on exit. 810 * @see #shutdownMiniDFSCluster() 811 */ 812 public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option) 813 throws Exception { 814 LOG.info("Starting up minicluster with option: {}", option); 815 816 // If we already put up a cluster, fail. 817 if (miniClusterRunning) { 818 throw new IllegalStateException("A mini-cluster is already running"); 819 } 820 miniClusterRunning = true; 821 822 setupClusterTestDir(); 823 824 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 825 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 826 if (dfsCluster == null) { 827 LOG.info("STARTING DFS"); 828 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 829 } else { 830 LOG.info("NOT STARTING DFS"); 831 } 832 833 // Start up a zk cluster. 834 if (getZkCluster() == null) { 835 startMiniZKCluster(option.getNumZkServers()); 836 } 837 838 // Start the MiniHBaseCluster 839 return startMiniHBaseCluster(option); 840 } 841 842 /** 843 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 844 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 845 * @return Reference to the hbase mini hbase cluster. 846 * @see #startMiniCluster(StartTestingClusterOption) 847 * @see #shutdownMiniHBaseCluster() 848 */ 849 public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option) 850 throws IOException, InterruptedException { 851 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 852 createRootDir(option.isCreateRootDir()); 853 if (option.isCreateWALDir()) { 854 createWALRootDir(); 855 } 856 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 857 // for tests that do not read hbase-defaults.xml 858 setHBaseFsTmpDir(); 859 860 // These settings will make the server waits until this exact number of 861 // regions servers are connected. 862 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 863 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 864 } 865 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 866 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 867 } 868 869 Configuration c = new Configuration(this.conf); 870 this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(), 871 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 872 option.getMasterClass(), option.getRsClass()); 873 // Populate the master address configuration from mini cluster configuration. 874 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 875 // Don't leave here till we've done a successful scan of the hbase:meta 876 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 877 ResultScanner s = t.getScanner(new Scan())) { 878 for (;;) { 879 if (s.next() == null) { 880 break; 881 } 882 } 883 } 884 885 getAdmin(); // create immediately the hbaseAdmin 886 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 887 888 return (SingleProcessHBaseCluster) hbaseCluster; 889 } 890 891 /** 892 * Starts up mini hbase cluster using default options. Default options can be found in 893 * {@link StartTestingClusterOption.Builder}. 894 * @see #startMiniHBaseCluster(StartTestingClusterOption) 895 * @see #shutdownMiniHBaseCluster() 896 */ 897 public SingleProcessHBaseCluster startMiniHBaseCluster() 898 throws IOException, InterruptedException { 899 return startMiniHBaseCluster(StartTestingClusterOption.builder().build()); 900 } 901 902 /** 903 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 904 * {@link #startMiniCluster()}. All other options will use default values, defined in 905 * {@link StartTestingClusterOption.Builder}. 906 * @param numMasters Master node number. 907 * @param numRegionServers Number of region servers. 908 * @return The mini HBase cluster created. 909 * @see #shutdownMiniHBaseCluster() 910 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 911 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 912 * @see #startMiniHBaseCluster(StartTestingClusterOption) 913 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 914 */ 915 @Deprecated 916 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 917 throws IOException, InterruptedException { 918 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 919 .numRegionServers(numRegionServers).build(); 920 return startMiniHBaseCluster(option); 921 } 922 923 /** 924 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 925 * {@link #startMiniCluster()}. All other options will use default values, defined in 926 * {@link StartTestingClusterOption.Builder}. 927 * @param numMasters Master node number. 928 * @param numRegionServers Number of region servers. 929 * @param rsPorts Ports that RegionServer should use. 930 * @return The mini HBase cluster created. 931 * @see #shutdownMiniHBaseCluster() 932 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 933 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 934 * @see #startMiniHBaseCluster(StartTestingClusterOption) 935 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 936 */ 937 @Deprecated 938 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 939 List<Integer> rsPorts) throws IOException, InterruptedException { 940 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 941 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 942 return startMiniHBaseCluster(option); 943 } 944 945 /** 946 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 947 * {@link #startMiniCluster()}. All other options will use default values, defined in 948 * {@link StartTestingClusterOption.Builder}. 949 * @param numMasters Master node number. 950 * @param numRegionServers Number of region servers. 951 * @param rsPorts Ports that RegionServer should use. 952 * @param masterClass The class to use as HMaster, or null for default. 953 * @param rsClass The class to use as HRegionServer, or null for default. 954 * @param createRootDir Whether to create a new root or data directory path. 955 * @param createWALDir Whether to create a new WAL directory. 956 * @return The mini HBase cluster created. 957 * @see #shutdownMiniHBaseCluster() 958 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 959 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 960 * @see #startMiniHBaseCluster(StartTestingClusterOption) 961 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 962 */ 963 @Deprecated 964 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 965 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 966 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass, 967 boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException { 968 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 969 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 970 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 971 return startMiniHBaseCluster(option); 972 } 973 974 /** 975 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 976 * want to keep dfs/zk up and just stop/start hbase. 977 * @param servers number of region servers 978 */ 979 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 980 this.restartHBaseCluster(servers, null); 981 } 982 983 public void restartHBaseCluster(int servers, List<Integer> ports) 984 throws IOException, InterruptedException { 985 StartTestingClusterOption option = 986 StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 987 restartHBaseCluster(option); 988 invalidateConnection(); 989 } 990 991 public void restartHBaseCluster(StartTestingClusterOption option) 992 throws IOException, InterruptedException { 993 closeConnection(); 994 this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(), 995 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 996 option.getMasterClass(), option.getRsClass()); 997 // Don't leave here till we've done a successful scan of the hbase:meta 998 Connection conn = ConnectionFactory.createConnection(this.conf); 999 Table t = conn.getTable(TableName.META_TABLE_NAME); 1000 ResultScanner s = t.getScanner(new Scan()); 1001 while (s.next() != null) { 1002 // do nothing 1003 } 1004 LOG.info("HBase has been restarted"); 1005 s.close(); 1006 t.close(); 1007 conn.close(); 1008 } 1009 1010 /** 1011 * Returns current mini hbase cluster. Only has something in it after a call to 1012 * {@link #startMiniCluster()}. 1013 * @see #startMiniCluster() 1014 */ 1015 public SingleProcessHBaseCluster getMiniHBaseCluster() { 1016 if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) { 1017 return (SingleProcessHBaseCluster) this.hbaseCluster; 1018 } 1019 throw new RuntimeException( 1020 hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName()); 1021 } 1022 1023 /** 1024 * Stops mini hbase, zk, and hdfs clusters. 1025 * @see #startMiniCluster(int) 1026 */ 1027 public void shutdownMiniCluster() throws IOException { 1028 LOG.info("Shutting down minicluster"); 1029 shutdownMiniHBaseCluster(); 1030 shutdownMiniDFSCluster(); 1031 shutdownMiniZKCluster(); 1032 1033 cleanupTestDir(); 1034 miniClusterRunning = false; 1035 LOG.info("Minicluster is down"); 1036 } 1037 1038 /** 1039 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1040 * @throws java.io.IOException in case command is unsuccessful 1041 */ 1042 public void shutdownMiniHBaseCluster() throws IOException { 1043 cleanup(); 1044 if (this.hbaseCluster != null) { 1045 this.hbaseCluster.shutdown(); 1046 // Wait till hbase is down before going on to shutdown zk. 1047 this.hbaseCluster.waitUntilShutDown(); 1048 this.hbaseCluster = null; 1049 } 1050 if (zooKeeperWatcher != null) { 1051 zooKeeperWatcher.close(); 1052 zooKeeperWatcher = null; 1053 } 1054 } 1055 1056 /** 1057 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1058 * @throws java.io.IOException throws in case command is unsuccessful 1059 */ 1060 public void killMiniHBaseCluster() throws IOException { 1061 cleanup(); 1062 if (this.hbaseCluster != null) { 1063 getMiniHBaseCluster().killAll(); 1064 this.hbaseCluster = null; 1065 } 1066 if (zooKeeperWatcher != null) { 1067 zooKeeperWatcher.close(); 1068 zooKeeperWatcher = null; 1069 } 1070 } 1071 1072 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1073 private void cleanup() throws IOException { 1074 closeConnection(); 1075 // unset the configuration for MIN and MAX RS to start 1076 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1077 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1078 } 1079 1080 /** 1081 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1082 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1083 * If false, previous path is used. Note: this does not cause the root dir to be created. 1084 * @return Fully qualified path for the default hbase root dir 1085 */ 1086 public Path getDefaultRootDirPath(boolean create) throws IOException { 1087 if (!create) { 1088 return getDataTestDirOnTestFS(); 1089 } else { 1090 return getNewDataTestDirOnTestFS(); 1091 } 1092 } 1093 1094 /** 1095 * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that 1096 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1097 * @return Fully qualified path for the default hbase root dir 1098 */ 1099 public Path getDefaultRootDirPath() throws IOException { 1100 return getDefaultRootDirPath(false); 1101 } 1102 1103 /** 1104 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1105 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1106 * startup. You'd only use this method if you were doing manual operation. 1107 * @param create This flag decides whether to get a new root or data directory path or not, if it 1108 * has been fetched already. Note : Directory will be made irrespective of whether 1109 * path has been fetched or not. If directory already exists, it will be overwritten 1110 * @return Fully qualified path to hbase root dir 1111 */ 1112 public Path createRootDir(boolean create) throws IOException { 1113 FileSystem fs = FileSystem.get(this.conf); 1114 Path hbaseRootdir = getDefaultRootDirPath(create); 1115 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1116 fs.mkdirs(hbaseRootdir); 1117 FSUtils.setVersion(fs, hbaseRootdir); 1118 return hbaseRootdir; 1119 } 1120 1121 /** 1122 * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code> 1123 * flag is false. 1124 * @return Fully qualified path to hbase root dir 1125 */ 1126 public Path createRootDir() throws IOException { 1127 return createRootDir(false); 1128 } 1129 1130 /** 1131 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1132 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1133 * this method if you were doing manual operation. 1134 * @return Fully qualified path to hbase root dir 1135 */ 1136 public Path createWALRootDir() throws IOException { 1137 FileSystem fs = FileSystem.get(this.conf); 1138 Path walDir = getNewDataTestDirOnTestFS(); 1139 CommonFSUtils.setWALRootDir(this.conf, walDir); 1140 fs.mkdirs(walDir); 1141 return walDir; 1142 } 1143 1144 private void setHBaseFsTmpDir() throws IOException { 1145 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1146 if (hbaseFsTmpDirInString == null) { 1147 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1148 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1149 } else { 1150 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1151 } 1152 } 1153 1154 /** 1155 * Flushes all caches in the mini hbase cluster 1156 */ 1157 public void flush() throws IOException { 1158 getMiniHBaseCluster().flushcache(); 1159 } 1160 1161 /** 1162 * Flushes all caches in the mini hbase cluster 1163 */ 1164 public void flush(TableName tableName) throws IOException { 1165 getMiniHBaseCluster().flushcache(tableName); 1166 } 1167 1168 /** 1169 * Compact all regions in the mini hbase cluster 1170 */ 1171 public void compact(boolean major) throws IOException { 1172 getMiniHBaseCluster().compact(major); 1173 } 1174 1175 /** 1176 * Compact all of a table's reagion in the mini hbase cluster 1177 */ 1178 public void compact(TableName tableName, boolean major) throws IOException { 1179 getMiniHBaseCluster().compact(tableName, major); 1180 } 1181 1182 /** 1183 * Create a table. 1184 * @return A Table instance for the created table. 1185 */ 1186 public Table createTable(TableName tableName, String family) throws IOException { 1187 return createTable(tableName, new String[] { family }); 1188 } 1189 1190 /** 1191 * Create a table. 1192 * @return A Table instance for the created table. 1193 */ 1194 public Table createTable(TableName tableName, String[] families) throws IOException { 1195 List<byte[]> fams = new ArrayList<>(families.length); 1196 for (String family : families) { 1197 fams.add(Bytes.toBytes(family)); 1198 } 1199 return createTable(tableName, fams.toArray(new byte[0][])); 1200 } 1201 1202 /** 1203 * Create a table. 1204 * @return A Table instance for the created table. 1205 */ 1206 public Table createTable(TableName tableName, byte[] family) throws IOException { 1207 return createTable(tableName, new byte[][] { family }); 1208 } 1209 1210 /** 1211 * Create a table with multiple regions. 1212 * @return A Table instance for the created table. 1213 */ 1214 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1215 throws IOException { 1216 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1217 byte[] startKey = Bytes.toBytes("aaaaa"); 1218 byte[] endKey = Bytes.toBytes("zzzzz"); 1219 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1220 1221 return createTable(tableName, new byte[][] { family }, splitKeys); 1222 } 1223 1224 /** 1225 * Create a table. 1226 * @return A Table instance for the created table. 1227 */ 1228 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1229 return createTable(tableName, families, (byte[][]) null); 1230 } 1231 1232 /** 1233 * Create a table with multiple regions. 1234 * @return A Table instance for the created table. 1235 */ 1236 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1237 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1238 } 1239 1240 /** 1241 * Create a table with multiple regions. 1242 * @param replicaCount replica count. 1243 * @return A Table instance for the created table. 1244 */ 1245 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1246 throws IOException { 1247 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1248 } 1249 1250 /** 1251 * Create a table. 1252 * @return A Table instance for the created table. 1253 */ 1254 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1255 throws IOException { 1256 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1257 } 1258 1259 /** 1260 * Create a table. 1261 * @param tableName the table name 1262 * @param families the families 1263 * @param splitKeys the splitkeys 1264 * @param replicaCount the region replica count 1265 * @return A Table instance for the created table. 1266 * @throws IOException throws IOException 1267 */ 1268 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1269 int replicaCount) throws IOException { 1270 return createTable(tableName, families, splitKeys, replicaCount, 1271 new Configuration(getConfiguration())); 1272 } 1273 1274 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1275 byte[] endKey, int numRegions) throws IOException { 1276 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1277 1278 getAdmin().createTable(desc, startKey, endKey, numRegions); 1279 // HBaseAdmin only waits for regions to appear in hbase:meta we 1280 // should wait until they are assigned 1281 waitUntilAllRegionsAssigned(tableName); 1282 return getConnection().getTable(tableName); 1283 } 1284 1285 /** 1286 * Create a table. 1287 * @param c Configuration to use 1288 * @return A Table instance for the created table. 1289 */ 1290 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1291 throws IOException { 1292 return createTable(htd, families, null, c); 1293 } 1294 1295 /** 1296 * Create a table. 1297 * @param htd table descriptor 1298 * @param families array of column families 1299 * @param splitKeys array of split keys 1300 * @param c Configuration to use 1301 * @return A Table instance for the created table. 1302 * @throws IOException if getAdmin or createTable fails 1303 */ 1304 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1305 Configuration c) throws IOException { 1306 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1307 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1308 // on is interfering. 1309 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1310 } 1311 1312 /** 1313 * Create a table. 1314 * @param htd table descriptor 1315 * @param families array of column families 1316 * @param splitKeys array of split keys 1317 * @param type Bloom type 1318 * @param blockSize block size 1319 * @param c Configuration to use 1320 * @return A Table instance for the created table. 1321 * @throws IOException if getAdmin or createTable fails 1322 */ 1323 1324 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1325 BloomType type, int blockSize, Configuration c) throws IOException { 1326 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1327 for (byte[] family : families) { 1328 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1329 .setBloomFilterType(type).setBlocksize(blockSize); 1330 if (isNewVersionBehaviorEnabled()) { 1331 cfdb.setNewVersionBehavior(true); 1332 } 1333 builder.setColumnFamily(cfdb.build()); 1334 } 1335 TableDescriptor td = builder.build(); 1336 if (splitKeys != null) { 1337 getAdmin().createTable(td, splitKeys); 1338 } else { 1339 getAdmin().createTable(td); 1340 } 1341 // HBaseAdmin only waits for regions to appear in hbase:meta 1342 // we should wait until they are assigned 1343 waitUntilAllRegionsAssigned(td.getTableName()); 1344 return getConnection().getTable(td.getTableName()); 1345 } 1346 1347 /** 1348 * Create a table. 1349 * @param htd table descriptor 1350 * @param splitRows array of split keys 1351 * @return A Table instance for the created table. 1352 */ 1353 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1354 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1355 if (isNewVersionBehaviorEnabled()) { 1356 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1357 builder.setColumnFamily( 1358 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1359 } 1360 } 1361 if (splitRows != null) { 1362 getAdmin().createTable(builder.build(), splitRows); 1363 } else { 1364 getAdmin().createTable(builder.build()); 1365 } 1366 // HBaseAdmin only waits for regions to appear in hbase:meta 1367 // we should wait until they are assigned 1368 waitUntilAllRegionsAssigned(htd.getTableName()); 1369 return getConnection().getTable(htd.getTableName()); 1370 } 1371 1372 /** 1373 * Create a table. 1374 * @param tableName the table name 1375 * @param families the families 1376 * @param splitKeys the split keys 1377 * @param replicaCount the replica count 1378 * @param c Configuration to use 1379 * @return A Table instance for the created table. 1380 */ 1381 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1382 int replicaCount, final Configuration c) throws IOException { 1383 TableDescriptor htd = 1384 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1385 return createTable(htd, families, splitKeys, c); 1386 } 1387 1388 /** 1389 * Create a table. 1390 * @return A Table instance for the created table. 1391 */ 1392 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1393 return createTable(tableName, new byte[][] { family }, numVersions); 1394 } 1395 1396 /** 1397 * Create a table. 1398 * @return A Table instance for the created table. 1399 */ 1400 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1401 throws IOException { 1402 return createTable(tableName, families, numVersions, (byte[][]) null); 1403 } 1404 1405 /** 1406 * Create a table. 1407 * @return A Table instance for the created table. 1408 */ 1409 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1410 byte[][] splitKeys) throws IOException { 1411 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1412 for (byte[] family : families) { 1413 ColumnFamilyDescriptorBuilder cfBuilder = 1414 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1415 if (isNewVersionBehaviorEnabled()) { 1416 cfBuilder.setNewVersionBehavior(true); 1417 } 1418 builder.setColumnFamily(cfBuilder.build()); 1419 } 1420 if (splitKeys != null) { 1421 getAdmin().createTable(builder.build(), splitKeys); 1422 } else { 1423 getAdmin().createTable(builder.build()); 1424 } 1425 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1426 // assigned 1427 waitUntilAllRegionsAssigned(tableName); 1428 return getConnection().getTable(tableName); 1429 } 1430 1431 /** 1432 * Create a table with multiple regions. 1433 * @return A Table instance for the created table. 1434 */ 1435 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1436 throws IOException { 1437 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1438 } 1439 1440 /** 1441 * Create a table. 1442 * @return A Table instance for the created table. 1443 */ 1444 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1445 throws IOException { 1446 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1447 for (byte[] family : families) { 1448 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1449 .setMaxVersions(numVersions).setBlocksize(blockSize); 1450 if (isNewVersionBehaviorEnabled()) { 1451 cfBuilder.setNewVersionBehavior(true); 1452 } 1453 builder.setColumnFamily(cfBuilder.build()); 1454 } 1455 getAdmin().createTable(builder.build()); 1456 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1457 // assigned 1458 waitUntilAllRegionsAssigned(tableName); 1459 return getConnection().getTable(tableName); 1460 } 1461 1462 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1463 String cpName) throws IOException { 1464 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1465 for (byte[] family : families) { 1466 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1467 .setMaxVersions(numVersions).setBlocksize(blockSize); 1468 if (isNewVersionBehaviorEnabled()) { 1469 cfBuilder.setNewVersionBehavior(true); 1470 } 1471 builder.setColumnFamily(cfBuilder.build()); 1472 } 1473 if (cpName != null) { 1474 builder.setCoprocessor(cpName); 1475 } 1476 getAdmin().createTable(builder.build()); 1477 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1478 // assigned 1479 waitUntilAllRegionsAssigned(tableName); 1480 return getConnection().getTable(tableName); 1481 } 1482 1483 /** 1484 * Create a table. 1485 * @return A Table instance for the created table. 1486 */ 1487 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1488 throws IOException { 1489 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1490 int i = 0; 1491 for (byte[] family : families) { 1492 ColumnFamilyDescriptorBuilder cfBuilder = 1493 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1494 if (isNewVersionBehaviorEnabled()) { 1495 cfBuilder.setNewVersionBehavior(true); 1496 } 1497 builder.setColumnFamily(cfBuilder.build()); 1498 i++; 1499 } 1500 getAdmin().createTable(builder.build()); 1501 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1502 // assigned 1503 waitUntilAllRegionsAssigned(tableName); 1504 return getConnection().getTable(tableName); 1505 } 1506 1507 /** 1508 * Create a table. 1509 * @return A Table instance for the created table. 1510 */ 1511 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1512 throws IOException { 1513 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1514 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1515 if (isNewVersionBehaviorEnabled()) { 1516 cfBuilder.setNewVersionBehavior(true); 1517 } 1518 builder.setColumnFamily(cfBuilder.build()); 1519 getAdmin().createTable(builder.build(), splitRows); 1520 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1521 // assigned 1522 waitUntilAllRegionsAssigned(tableName); 1523 return getConnection().getTable(tableName); 1524 } 1525 1526 /** 1527 * Create a table with multiple regions. 1528 * @return A Table instance for the created table. 1529 */ 1530 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1531 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1532 } 1533 1534 /** 1535 * Set the number of Region replicas. 1536 */ 1537 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1538 throws IOException, InterruptedException { 1539 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1540 .setRegionReplication(replicaCount).build(); 1541 admin.modifyTable(desc); 1542 } 1543 1544 /** 1545 * Set the number of Region replicas. 1546 */ 1547 public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount) 1548 throws ExecutionException, IOException, InterruptedException { 1549 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get()) 1550 .setRegionReplication(replicaCount).build(); 1551 admin.modifyTable(desc).get(); 1552 } 1553 1554 /** 1555 * Drop an existing table 1556 * @param tableName existing table 1557 */ 1558 public void deleteTable(TableName tableName) throws IOException { 1559 try { 1560 getAdmin().disableTable(tableName); 1561 } catch (TableNotEnabledException e) { 1562 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1563 } 1564 getAdmin().deleteTable(tableName); 1565 } 1566 1567 /** 1568 * Drop an existing table 1569 * @param tableName existing table 1570 */ 1571 public void deleteTableIfAny(TableName tableName) throws IOException { 1572 try { 1573 deleteTable(tableName); 1574 } catch (TableNotFoundException e) { 1575 // ignore 1576 } 1577 } 1578 1579 // ========================================================================== 1580 // Canned table and table descriptor creation 1581 1582 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1583 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1584 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1585 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1586 private static final int MAXVERSIONS = 3; 1587 1588 public static final char FIRST_CHAR = 'a'; 1589 public static final char LAST_CHAR = 'z'; 1590 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1591 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1592 1593 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1594 return createModifyableTableDescriptor(TableName.valueOf(name), 1595 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1596 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1597 } 1598 1599 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1600 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1601 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1602 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1603 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1604 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1605 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1606 if (isNewVersionBehaviorEnabled()) { 1607 cfBuilder.setNewVersionBehavior(true); 1608 } 1609 builder.setColumnFamily(cfBuilder.build()); 1610 } 1611 return builder.build(); 1612 } 1613 1614 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1615 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1616 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1617 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1618 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1619 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1620 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1621 if (isNewVersionBehaviorEnabled()) { 1622 cfBuilder.setNewVersionBehavior(true); 1623 } 1624 builder.setColumnFamily(cfBuilder.build()); 1625 } 1626 return builder; 1627 } 1628 1629 /** 1630 * Create a table of name <code>name</code>. 1631 * @param name Name to give table. 1632 * @return Column descriptor. 1633 */ 1634 public TableDescriptor createTableDescriptor(final TableName name) { 1635 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1636 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1637 } 1638 1639 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1640 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1641 } 1642 1643 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1644 int maxVersions) { 1645 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1646 for (byte[] family : families) { 1647 ColumnFamilyDescriptorBuilder cfBuilder = 1648 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1649 if (isNewVersionBehaviorEnabled()) { 1650 cfBuilder.setNewVersionBehavior(true); 1651 } 1652 builder.setColumnFamily(cfBuilder.build()); 1653 } 1654 return builder.build(); 1655 } 1656 1657 /** 1658 * Create an HRegion that writes to the local tmp dirs 1659 * @param desc a table descriptor indicating which table the region belongs to 1660 * @param startKey the start boundary of the region 1661 * @param endKey the end boundary of the region 1662 * @return a region that writes to local dir for testing 1663 */ 1664 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1665 throws IOException { 1666 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1667 .setEndKey(endKey).build(); 1668 return createLocalHRegion(hri, desc); 1669 } 1670 1671 /** 1672 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1673 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it. 1674 */ 1675 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1676 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1677 } 1678 1679 /** 1680 * Create an HRegion that writes to the local tmp dirs with specified wal 1681 * @param info regioninfo 1682 * @param conf configuration 1683 * @param desc table descriptor 1684 * @param wal wal for this region. 1685 * @return created hregion 1686 */ 1687 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1688 WAL wal) throws IOException { 1689 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 1690 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1691 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1692 } 1693 1694 /** 1695 * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} 1696 * when done. 1697 */ 1698 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1699 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1700 throws IOException { 1701 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1702 durability, wal, null, families); 1703 } 1704 1705 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1706 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1707 boolean[] compactedMemStore, byte[]... families) throws IOException { 1708 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1709 builder.setReadOnly(isReadOnly); 1710 int i = 0; 1711 for (byte[] family : families) { 1712 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1713 if (compactedMemStore != null && i < compactedMemStore.length) { 1714 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1715 } else { 1716 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1717 1718 } 1719 i++; 1720 // Set default to be three versions. 1721 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1722 builder.setColumnFamily(cfBuilder.build()); 1723 } 1724 builder.setDurability(durability); 1725 RegionInfo info = 1726 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1727 return createLocalHRegion(info, conf, builder.build(), wal); 1728 } 1729 1730 // 1731 // ========================================================================== 1732 1733 /** 1734 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1735 * read. 1736 * @param tableName existing table 1737 * @return HTable to that new table 1738 */ 1739 public Table deleteTableData(TableName tableName) throws IOException { 1740 Table table = getConnection().getTable(tableName); 1741 Scan scan = new Scan(); 1742 ResultScanner resScan = table.getScanner(scan); 1743 for (Result res : resScan) { 1744 Delete del = new Delete(res.getRow()); 1745 table.delete(del); 1746 } 1747 resScan = table.getScanner(scan); 1748 resScan.close(); 1749 return table; 1750 } 1751 1752 /** 1753 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1754 * table. 1755 * @param tableName table which must exist. 1756 * @param preserveRegions keep the existing split points 1757 * @return HTable for the new table 1758 */ 1759 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 1760 throws IOException { 1761 Admin admin = getAdmin(); 1762 if (!admin.isTableDisabled(tableName)) { 1763 admin.disableTable(tableName); 1764 } 1765 admin.truncateTable(tableName, preserveRegions); 1766 return getConnection().getTable(tableName); 1767 } 1768 1769 /** 1770 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1771 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 1772 * preserve regions of existing table. 1773 * @param tableName table which must exist. 1774 * @return HTable for the new table 1775 */ 1776 public Table truncateTable(final TableName tableName) throws IOException { 1777 return truncateTable(tableName, false); 1778 } 1779 1780 /** 1781 * Load table with rows from 'aaa' to 'zzz'. 1782 * @param t Table 1783 * @param f Family 1784 * @return Count of rows loaded. 1785 */ 1786 public int loadTable(final Table t, final byte[] f) throws IOException { 1787 return loadTable(t, new byte[][] { f }); 1788 } 1789 1790 /** 1791 * Load table with rows from 'aaa' to 'zzz'. 1792 * @param t Table 1793 * @param f Family 1794 * @return Count of rows loaded. 1795 */ 1796 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 1797 return loadTable(t, new byte[][] { f }, null, writeToWAL); 1798 } 1799 1800 /** 1801 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1802 * @param t Table 1803 * @param f Array of Families to load 1804 * @return Count of rows loaded. 1805 */ 1806 public int loadTable(final Table t, final byte[][] f) throws IOException { 1807 return loadTable(t, f, null); 1808 } 1809 1810 /** 1811 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1812 * @param t Table 1813 * @param f Array of Families to load 1814 * @param value the values of the cells. If null is passed, the row key is used as value 1815 * @return Count of rows loaded. 1816 */ 1817 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 1818 return loadTable(t, f, value, true); 1819 } 1820 1821 /** 1822 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1823 * @param t Table 1824 * @param f Array of Families to load 1825 * @param value the values of the cells. If null is passed, the row key is used as value 1826 * @return Count of rows loaded. 1827 */ 1828 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 1829 throws IOException { 1830 List<Put> puts = new ArrayList<>(); 1831 for (byte[] row : HBaseTestingUtil.ROWS) { 1832 Put put = new Put(row); 1833 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 1834 for (int i = 0; i < f.length; i++) { 1835 byte[] value1 = value != null ? value : row; 1836 put.addColumn(f[i], f[i], value1); 1837 } 1838 puts.add(put); 1839 } 1840 t.put(puts); 1841 return puts.size(); 1842 } 1843 1844 /** 1845 * A tracker for tracking and validating table rows generated with 1846 * {@link HBaseTestingUtil#loadTable(Table, byte[])} 1847 */ 1848 public static class SeenRowTracker { 1849 int dim = 'z' - 'a' + 1; 1850 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 1851 byte[] startRow; 1852 byte[] stopRow; 1853 1854 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 1855 this.startRow = startRow; 1856 this.stopRow = stopRow; 1857 } 1858 1859 void reset() { 1860 for (byte[] row : ROWS) { 1861 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 1862 } 1863 } 1864 1865 int i(byte b) { 1866 return b - 'a'; 1867 } 1868 1869 public void addRow(byte[] row) { 1870 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 1871 } 1872 1873 /** 1874 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 1875 * rows none 1876 */ 1877 public void validate() { 1878 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1879 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1880 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1881 int count = seenRows[i(b1)][i(b2)][i(b3)]; 1882 int expectedCount = 0; 1883 if ( 1884 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 1885 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 1886 ) { 1887 expectedCount = 1; 1888 } 1889 if (count != expectedCount) { 1890 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 1891 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 1892 + "instead of " + expectedCount); 1893 } 1894 } 1895 } 1896 } 1897 } 1898 } 1899 1900 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 1901 return loadRegion(r, f, false); 1902 } 1903 1904 public int loadRegion(final Region r, final byte[] f) throws IOException { 1905 return loadRegion((HRegion) r, f); 1906 } 1907 1908 /** 1909 * Load region with rows from 'aaa' to 'zzz'. 1910 * @param r Region 1911 * @param f Family 1912 * @param flush flush the cache if true 1913 * @return Count of rows loaded. 1914 */ 1915 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 1916 byte[] k = new byte[3]; 1917 int rowCount = 0; 1918 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1919 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1920 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1921 k[0] = b1; 1922 k[1] = b2; 1923 k[2] = b3; 1924 Put put = new Put(k); 1925 put.setDurability(Durability.SKIP_WAL); 1926 put.addColumn(f, null, k); 1927 if (r.getWAL() == null) { 1928 put.setDurability(Durability.SKIP_WAL); 1929 } 1930 int preRowCount = rowCount; 1931 int pause = 10; 1932 int maxPause = 1000; 1933 while (rowCount == preRowCount) { 1934 try { 1935 r.put(put); 1936 rowCount++; 1937 } catch (RegionTooBusyException e) { 1938 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 1939 Threads.sleep(pause); 1940 } 1941 } 1942 } 1943 } 1944 if (flush) { 1945 r.flush(true); 1946 } 1947 } 1948 return rowCount; 1949 } 1950 1951 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 1952 throws IOException { 1953 for (int i = startRow; i < endRow; i++) { 1954 byte[] data = Bytes.toBytes(String.valueOf(i)); 1955 Put put = new Put(data); 1956 put.addColumn(f, null, data); 1957 t.put(put); 1958 } 1959 } 1960 1961 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 1962 throws IOException { 1963 for (int i = 0; i < totalRows; i++) { 1964 byte[] row = new byte[rowSize]; 1965 Bytes.random(row); 1966 Put put = new Put(row); 1967 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 1968 t.put(put); 1969 } 1970 } 1971 1972 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 1973 int replicaId) throws IOException { 1974 for (int i = startRow; i < endRow; i++) { 1975 String failMsg = "Failed verification of row :" + i; 1976 byte[] data = Bytes.toBytes(String.valueOf(i)); 1977 Get get = new Get(data); 1978 get.setReplicaId(replicaId); 1979 get.setConsistency(Consistency.TIMELINE); 1980 Result result = table.get(get); 1981 assertTrue(failMsg, result.containsColumn(f, null)); 1982 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 1983 Cell cell = result.getColumnLatestCell(f, null); 1984 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 1985 cell.getValueOffset(), cell.getValueLength())); 1986 } 1987 } 1988 1989 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 1990 throws IOException { 1991 verifyNumericRows((HRegion) region, f, startRow, endRow); 1992 } 1993 1994 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 1995 throws IOException { 1996 verifyNumericRows(region, f, startRow, endRow, true); 1997 } 1998 1999 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 2000 final boolean present) throws IOException { 2001 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 2002 } 2003 2004 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 2005 final boolean present) throws IOException { 2006 for (int i = startRow; i < endRow; i++) { 2007 String failMsg = "Failed verification of row :" + i; 2008 byte[] data = Bytes.toBytes(String.valueOf(i)); 2009 Result result = region.get(new Get(data)); 2010 2011 boolean hasResult = result != null && !result.isEmpty(); 2012 assertEquals(failMsg + result, present, hasResult); 2013 if (!present) continue; 2014 2015 assertTrue(failMsg, result.containsColumn(f, null)); 2016 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2017 Cell cell = result.getColumnLatestCell(f, null); 2018 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2019 cell.getValueOffset(), cell.getValueLength())); 2020 } 2021 } 2022 2023 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2024 throws IOException { 2025 for (int i = startRow; i < endRow; i++) { 2026 byte[] data = Bytes.toBytes(String.valueOf(i)); 2027 Delete delete = new Delete(data); 2028 delete.addFamily(f); 2029 t.delete(delete); 2030 } 2031 } 2032 2033 /** 2034 * Return the number of rows in the given table. 2035 * @param table to count rows 2036 * @return count of rows 2037 */ 2038 public static int countRows(final Table table) throws IOException { 2039 return countRows(table, new Scan()); 2040 } 2041 2042 public static int countRows(final Table table, final Scan scan) throws IOException { 2043 try (ResultScanner results = table.getScanner(scan)) { 2044 int count = 0; 2045 while (results.next() != null) { 2046 count++; 2047 } 2048 return count; 2049 } 2050 } 2051 2052 public static int countRows(final Table table, final byte[]... families) throws IOException { 2053 Scan scan = new Scan(); 2054 for (byte[] family : families) { 2055 scan.addFamily(family); 2056 } 2057 return countRows(table, scan); 2058 } 2059 2060 /** 2061 * Return the number of rows in the given table. 2062 */ 2063 public int countRows(final TableName tableName) throws IOException { 2064 try (Table table = getConnection().getTable(tableName)) { 2065 return countRows(table); 2066 } 2067 } 2068 2069 public static int countRows(final Region region) throws IOException { 2070 return countRows(region, new Scan()); 2071 } 2072 2073 public static int countRows(final Region region, final Scan scan) throws IOException { 2074 try (InternalScanner scanner = region.getScanner(scan)) { 2075 return countRows(scanner); 2076 } 2077 } 2078 2079 public static int countRows(final InternalScanner scanner) throws IOException { 2080 int scannedCount = 0; 2081 List<Cell> results = new ArrayList<>(); 2082 boolean hasMore = true; 2083 while (hasMore) { 2084 hasMore = scanner.next(results); 2085 scannedCount += results.size(); 2086 results.clear(); 2087 } 2088 return scannedCount; 2089 } 2090 2091 /** 2092 * Return an md5 digest of the entire contents of a table. 2093 */ 2094 public String checksumRows(final Table table) throws Exception { 2095 MessageDigest digest = MessageDigest.getInstance("MD5"); 2096 try (ResultScanner results = table.getScanner(new Scan())) { 2097 for (Result res : results) { 2098 digest.update(res.getRow()); 2099 } 2100 } 2101 return digest.toString(); 2102 } 2103 2104 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2105 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2106 static { 2107 int i = 0; 2108 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2109 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2110 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2111 ROWS[i][0] = b1; 2112 ROWS[i][1] = b2; 2113 ROWS[i][2] = b3; 2114 i++; 2115 } 2116 } 2117 } 2118 } 2119 2120 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2121 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2122 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2123 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2124 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2125 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2126 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2127 2128 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2129 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2130 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2131 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2132 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2133 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2134 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2135 2136 /** 2137 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2138 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2139 * @return list of region info for regions added to meta 2140 */ 2141 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2142 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2143 try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 2144 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2145 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2146 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2147 TableState.State.ENABLED); 2148 // add custom ones 2149 for (int i = 0; i < startKeys.length; i++) { 2150 int j = (i + 1) % startKeys.length; 2151 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2152 .setEndKey(startKeys[j]).build(); 2153 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2154 newRegions.add(hri); 2155 } 2156 return newRegions; 2157 } 2158 } 2159 2160 /** 2161 * Create an unmanaged WAL. Be sure to close it when you're through. 2162 */ 2163 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2164 throws IOException { 2165 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2166 // unless I pass along via the conf. 2167 Configuration confForWAL = new Configuration(conf); 2168 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 2169 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri); 2170 } 2171 2172 /** 2173 * Create a region with it's own WAL. Be sure to call 2174 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2175 */ 2176 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2177 final Configuration conf, final TableDescriptor htd) throws IOException { 2178 return createRegionAndWAL(info, rootDir, conf, htd, true); 2179 } 2180 2181 /** 2182 * Create a region with it's own WAL. Be sure to call 2183 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2184 */ 2185 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2186 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2187 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2188 region.setBlockCache(blockCache); 2189 region.initialize(); 2190 return region; 2191 } 2192 2193 /** 2194 * Create a region with it's own WAL. Be sure to call 2195 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2196 */ 2197 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2198 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2199 throws IOException { 2200 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2201 region.setMobFileCache(mobFileCache); 2202 region.initialize(); 2203 return region; 2204 } 2205 2206 /** 2207 * Create a region with it's own WAL. Be sure to call 2208 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2209 */ 2210 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2211 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2212 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2213 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2214 WAL wal = createWal(conf, rootDir, info); 2215 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2216 } 2217 2218 /** 2219 * Find any other region server which is different from the one identified by parameter 2220 * @return another region server 2221 */ 2222 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2223 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2224 if (!(rst.getRegionServer() == rs)) { 2225 return rst.getRegionServer(); 2226 } 2227 } 2228 return null; 2229 } 2230 2231 /** 2232 * Tool to get the reference to the region server object that holds the region of the specified 2233 * user table. 2234 * @param tableName user table to lookup in hbase:meta 2235 * @return region server that holds it, null if the row doesn't exist 2236 */ 2237 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2238 throws IOException, InterruptedException { 2239 List<RegionInfo> regions = getAdmin().getRegions(tableName); 2240 if (regions == null || regions.isEmpty()) { 2241 return null; 2242 } 2243 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2244 2245 byte[] firstRegionName = 2246 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2247 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2248 2249 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2250 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2251 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2252 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2253 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2254 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2255 while (retrier.shouldRetry()) { 2256 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2257 if (index != -1) { 2258 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2259 } 2260 // Came back -1. Region may not be online yet. Sleep a while. 2261 retrier.sleepUntilNextRetry(); 2262 } 2263 return null; 2264 } 2265 2266 /** 2267 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2268 * @throws IOException When starting the cluster fails. 2269 */ 2270 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2271 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2272 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2273 "99.0"); 2274 startMiniMapReduceCluster(2); 2275 return mrCluster; 2276 } 2277 2278 /** 2279 * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its 2280 * internal static LOG_DIR variable. 2281 */ 2282 private void forceChangeTaskLogDir() { 2283 Field logDirField; 2284 try { 2285 logDirField = TaskLog.class.getDeclaredField("LOG_DIR"); 2286 logDirField.setAccessible(true); 2287 2288 Field modifiersField = ReflectionUtils.getModifiersField(); 2289 modifiersField.setAccessible(true); 2290 modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL); 2291 2292 logDirField.set(null, new File(hadoopLogDir, "userlogs")); 2293 } catch (SecurityException e) { 2294 throw new RuntimeException(e); 2295 } catch (NoSuchFieldException e) { 2296 throw new RuntimeException(e); 2297 } catch (IllegalArgumentException e) { 2298 throw new RuntimeException(e); 2299 } catch (IllegalAccessException e) { 2300 throw new RuntimeException(e); 2301 } 2302 } 2303 2304 /** 2305 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2306 * filesystem. 2307 * @param servers The number of <code>TaskTracker</code>'s to start. 2308 * @throws IOException When starting the cluster fails. 2309 */ 2310 private void startMiniMapReduceCluster(final int servers) throws IOException { 2311 if (mrCluster != null) { 2312 throw new IllegalStateException("MiniMRCluster is already running"); 2313 } 2314 LOG.info("Starting mini mapreduce cluster..."); 2315 setupClusterTestDir(); 2316 createDirsAndSetProperties(); 2317 2318 forceChangeTaskLogDir(); 2319 2320 //// hadoop2 specific settings 2321 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2322 // we up the VM usable so that processes don't get killed. 2323 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2324 2325 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2326 // this avoids the problem by disabling speculative task execution in tests. 2327 conf.setBoolean("mapreduce.map.speculative", false); 2328 conf.setBoolean("mapreduce.reduce.speculative", false); 2329 //// 2330 2331 // Yarn container runs in independent JVM. We need to pass the argument manually here if the 2332 // JDK version >= 17. Otherwise, the MiniMRCluster will fail. 2333 if (JVM.getJVMSpecVersion() >= 17) { 2334 String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", ""); 2335 conf.set("yarn.app.mapreduce.am.command-opts", 2336 jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED"); 2337 } 2338 2339 // Allow the user to override FS URI for this map-reduce cluster to use. 2340 mrCluster = 2341 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2342 1, null, null, new JobConf(this.conf)); 2343 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2344 if (jobConf == null) { 2345 jobConf = mrCluster.createJobConf(); 2346 } 2347 2348 // Hadoop MiniMR overwrites this while it should not 2349 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2350 LOG.info("Mini mapreduce cluster started"); 2351 2352 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2353 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2354 // necessary config properties here. YARN-129 required adding a few properties. 2355 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2356 // this for mrv2 support; mr1 ignores this 2357 conf.set("mapreduce.framework.name", "yarn"); 2358 conf.setBoolean("yarn.is.minicluster", true); 2359 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2360 if (rmAddress != null) { 2361 conf.set("yarn.resourcemanager.address", rmAddress); 2362 } 2363 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2364 if (historyAddress != null) { 2365 conf.set("mapreduce.jobhistory.address", historyAddress); 2366 } 2367 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2368 if (schedulerAddress != null) { 2369 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2370 } 2371 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2372 if (mrJobHistoryWebappAddress != null) { 2373 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2374 } 2375 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2376 if (yarnRMWebappAddress != null) { 2377 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2378 } 2379 } 2380 2381 /** 2382 * Stops the previously started <code>MiniMRCluster</code>. 2383 */ 2384 public void shutdownMiniMapReduceCluster() { 2385 if (mrCluster != null) { 2386 LOG.info("Stopping mini mapreduce cluster..."); 2387 mrCluster.shutdown(); 2388 mrCluster = null; 2389 LOG.info("Mini mapreduce cluster stopped"); 2390 } 2391 // Restore configuration to point to local jobtracker 2392 conf.set("mapreduce.jobtracker.address", "local"); 2393 } 2394 2395 /** 2396 * Create a stubbed out RegionServerService, mainly for getting FS. 2397 */ 2398 public RegionServerServices createMockRegionServerService() throws IOException { 2399 return createMockRegionServerService((ServerName) null); 2400 } 2401 2402 /** 2403 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2404 * TestTokenAuthentication 2405 */ 2406 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2407 throws IOException { 2408 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2409 rss.setFileSystem(getTestFileSystem()); 2410 rss.setRpcServer(rpc); 2411 return rss; 2412 } 2413 2414 /** 2415 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2416 * TestOpenRegionHandler 2417 */ 2418 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2419 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2420 rss.setFileSystem(getTestFileSystem()); 2421 return rss; 2422 } 2423 2424 /** 2425 * Expire the Master's session 2426 */ 2427 public void expireMasterSession() throws Exception { 2428 HMaster master = getMiniHBaseCluster().getMaster(); 2429 expireSession(master.getZooKeeper(), false); 2430 } 2431 2432 /** 2433 * Expire a region server's session 2434 * @param index which RS 2435 */ 2436 public void expireRegionServerSession(int index) throws Exception { 2437 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2438 expireSession(rs.getZooKeeper(), false); 2439 decrementMinRegionServerCount(); 2440 } 2441 2442 private void decrementMinRegionServerCount() { 2443 // decrement the count for this.conf, for newly spwaned master 2444 // this.hbaseCluster shares this configuration too 2445 decrementMinRegionServerCount(getConfiguration()); 2446 2447 // each master thread keeps a copy of configuration 2448 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2449 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2450 } 2451 } 2452 2453 private void decrementMinRegionServerCount(Configuration conf) { 2454 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2455 if (currentCount != -1) { 2456 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2457 } 2458 } 2459 2460 public void expireSession(ZKWatcher nodeZK) throws Exception { 2461 expireSession(nodeZK, false); 2462 } 2463 2464 /** 2465 * Expire a ZooKeeper session as recommended in ZooKeeper documentation 2466 * http://hbase.apache.org/book.html#trouble.zookeeper 2467 * <p/> 2468 * There are issues when doing this: 2469 * <ol> 2470 * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li> 2471 * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li> 2472 * </ol> 2473 * @param nodeZK - the ZK watcher to expire 2474 * @param checkStatus - true to check if we can create a Table with the current configuration. 2475 */ 2476 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2477 Configuration c = new Configuration(this.conf); 2478 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2479 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2480 byte[] password = zk.getSessionPasswd(); 2481 long sessionID = zk.getSessionId(); 2482 2483 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2484 // so we create a first watcher to be sure that the 2485 // event was sent. We expect that if our watcher receives the event 2486 // other watchers on the same machine will get is as well. 2487 // When we ask to close the connection, ZK does not close it before 2488 // we receive all the events, so don't have to capture the event, just 2489 // closing the connection should be enough. 2490 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2491 @Override 2492 public void process(WatchedEvent watchedEvent) { 2493 LOG.info("Monitor ZKW received event=" + watchedEvent); 2494 } 2495 }, sessionID, password); 2496 2497 // Making it expire 2498 ZooKeeper newZK = 2499 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2500 2501 // ensure that we have connection to the server before closing down, otherwise 2502 // the close session event will be eaten out before we start CONNECTING state 2503 long start = EnvironmentEdgeManager.currentTime(); 2504 while ( 2505 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2506 ) { 2507 Thread.sleep(1); 2508 } 2509 newZK.close(); 2510 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2511 2512 // Now closing & waiting to be sure that the clients get it. 2513 monitor.close(); 2514 2515 if (checkStatus) { 2516 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2517 } 2518 } 2519 2520 /** 2521 * Get the Mini HBase cluster. 2522 * @return hbase cluster 2523 * @see #getHBaseClusterInterface() 2524 */ 2525 public SingleProcessHBaseCluster getHBaseCluster() { 2526 return getMiniHBaseCluster(); 2527 } 2528 2529 /** 2530 * Returns the HBaseCluster instance. 2531 * <p> 2532 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2533 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2534 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2535 * instead w/o the need to type-cast. 2536 */ 2537 public HBaseClusterInterface getHBaseClusterInterface() { 2538 // implementation note: we should rename this method as #getHBaseCluster(), 2539 // but this would require refactoring 90+ calls. 2540 return hbaseCluster; 2541 } 2542 2543 /** 2544 * Resets the connections so that the next time getConnection() is called, a new connection is 2545 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2546 * the connection is not valid anymore. 2547 * <p/> 2548 * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are 2549 * written, not all start() stop() calls go through this class. Most tests directly operate on the 2550 * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain 2551 * the connection state automatically. Cleaning this is a much bigger refactor. 2552 */ 2553 public void invalidateConnection() throws IOException { 2554 closeConnection(); 2555 // Update the master addresses if they changed. 2556 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2557 final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY); 2558 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2559 masterConfigBefore, masterConfAfter); 2560 conf.set(HConstants.MASTER_ADDRS_KEY, 2561 getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY)); 2562 } 2563 2564 /** 2565 * Get a shared Connection to the cluster. this method is thread safe. 2566 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2567 */ 2568 public Connection getConnection() throws IOException { 2569 return getAsyncConnection().toConnection(); 2570 } 2571 2572 /** 2573 * Get a assigned Connection to the cluster. this method is thread safe. 2574 * @param user assigned user 2575 * @return A Connection with assigned user. 2576 */ 2577 public Connection getConnection(User user) throws IOException { 2578 return getAsyncConnection(user).toConnection(); 2579 } 2580 2581 /** 2582 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2583 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2584 * of cluster. 2585 */ 2586 public AsyncClusterConnection getAsyncConnection() throws IOException { 2587 try { 2588 return asyncConnection.updateAndGet(connection -> { 2589 if (connection == null) { 2590 try { 2591 User user = UserProvider.instantiate(conf).getCurrent(); 2592 connection = getAsyncConnection(user); 2593 } catch (IOException ioe) { 2594 throw new UncheckedIOException("Failed to create connection", ioe); 2595 } 2596 } 2597 return connection; 2598 }); 2599 } catch (UncheckedIOException exception) { 2600 throw exception.getCause(); 2601 } 2602 } 2603 2604 /** 2605 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2606 * @param user assigned user 2607 * @return An AsyncClusterConnection with assigned user. 2608 */ 2609 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2610 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2611 } 2612 2613 public void closeConnection() throws IOException { 2614 if (hbaseAdmin != null) { 2615 Closeables.close(hbaseAdmin, true); 2616 hbaseAdmin = null; 2617 } 2618 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2619 if (asyncConnection != null) { 2620 Closeables.close(asyncConnection, true); 2621 } 2622 } 2623 2624 /** 2625 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2626 * it has no effect, it will be closed automatically when the cluster shutdowns 2627 */ 2628 public Admin getAdmin() throws IOException { 2629 if (hbaseAdmin == null) { 2630 this.hbaseAdmin = getConnection().getAdmin(); 2631 } 2632 return hbaseAdmin; 2633 } 2634 2635 private Admin hbaseAdmin = null; 2636 2637 /** 2638 * Returns an {@link Hbck} instance. Needs be closed when done. 2639 */ 2640 public Hbck getHbck() throws IOException { 2641 return getConnection().getHbck(); 2642 } 2643 2644 /** 2645 * Unassign the named region. 2646 * @param regionName The region to unassign. 2647 */ 2648 public void unassignRegion(String regionName) throws IOException { 2649 unassignRegion(Bytes.toBytes(regionName)); 2650 } 2651 2652 /** 2653 * Unassign the named region. 2654 * @param regionName The region to unassign. 2655 */ 2656 public void unassignRegion(byte[] regionName) throws IOException { 2657 getAdmin().unassign(regionName); 2658 } 2659 2660 /** 2661 * Closes the region containing the given row. 2662 * @param row The row to find the containing region. 2663 * @param table The table to find the region. 2664 */ 2665 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 2666 unassignRegionByRow(Bytes.toBytes(row), table); 2667 } 2668 2669 /** 2670 * Closes the region containing the given row. 2671 * @param row The row to find the containing region. 2672 * @param table The table to find the region. 2673 */ 2674 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 2675 HRegionLocation hrl = table.getRegionLocation(row); 2676 unassignRegion(hrl.getRegion().getRegionName()); 2677 } 2678 2679 /** 2680 * Retrieves a splittable region randomly from tableName 2681 * @param tableName name of table 2682 * @param maxAttempts maximum number of attempts, unlimited for value of -1 2683 * @return the HRegion chosen, null if none was found within limit of maxAttempts 2684 */ 2685 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 2686 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 2687 int regCount = regions.size(); 2688 Set<Integer> attempted = new HashSet<>(); 2689 int idx; 2690 int attempts = 0; 2691 do { 2692 regions = getHBaseCluster().getRegions(tableName); 2693 if (regCount != regions.size()) { 2694 // if there was region movement, clear attempted Set 2695 attempted.clear(); 2696 } 2697 regCount = regions.size(); 2698 // There are chances that before we get the region for the table from an RS the region may 2699 // be going for CLOSE. This may be because online schema change is enabled 2700 if (regCount > 0) { 2701 idx = ThreadLocalRandom.current().nextInt(regCount); 2702 // if we have just tried this region, there is no need to try again 2703 if (attempted.contains(idx)) { 2704 continue; 2705 } 2706 HRegion region = regions.get(idx); 2707 if (region.checkSplit().isPresent()) { 2708 return region; 2709 } 2710 attempted.add(idx); 2711 } 2712 attempts++; 2713 } while (maxAttempts == -1 || attempts < maxAttempts); 2714 return null; 2715 } 2716 2717 public MiniDFSCluster getDFSCluster() { 2718 return dfsCluster; 2719 } 2720 2721 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 2722 setDFSCluster(cluster, true); 2723 } 2724 2725 /** 2726 * Set the MiniDFSCluster 2727 * @param cluster cluster to use 2728 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 2729 * is set. 2730 * @throws IllegalStateException if the passed cluster is up when it is required to be down 2731 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 2732 */ 2733 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 2734 throws IllegalStateException, IOException { 2735 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 2736 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 2737 } 2738 this.dfsCluster = cluster; 2739 this.setFs(); 2740 } 2741 2742 public FileSystem getTestFileSystem() throws IOException { 2743 return HFileSystem.get(conf); 2744 } 2745 2746 /** 2747 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 2748 * (30 seconds). 2749 * @param table Table to wait on. 2750 */ 2751 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 2752 waitTableAvailable(table.getName(), 30000); 2753 } 2754 2755 public void waitTableAvailable(TableName table, long timeoutMillis) 2756 throws InterruptedException, IOException { 2757 waitFor(timeoutMillis, predicateTableAvailable(table)); 2758 } 2759 2760 /** 2761 * Wait until all regions in a table have been assigned 2762 * @param table Table to wait on. 2763 * @param timeoutMillis Timeout. 2764 */ 2765 public void waitTableAvailable(byte[] table, long timeoutMillis) 2766 throws InterruptedException, IOException { 2767 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 2768 } 2769 2770 public String explainTableAvailability(TableName tableName) throws IOException { 2771 StringBuilder msg = 2772 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 2773 if (getHBaseCluster().getMaster().isAlive()) { 2774 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 2775 .getRegionStates().getRegionAssignments(); 2776 final List<Pair<RegionInfo, ServerName>> metaLocations = 2777 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 2778 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 2779 RegionInfo hri = metaLocation.getFirst(); 2780 ServerName sn = metaLocation.getSecond(); 2781 if (!assignments.containsKey(hri)) { 2782 msg.append(", region ").append(hri) 2783 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 2784 } else if (sn == null) { 2785 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 2786 } else if (!sn.equals(assignments.get(hri))) { 2787 msg.append(", region ").append(hri) 2788 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 2789 .append(" <> ").append(assignments.get(hri)); 2790 } 2791 } 2792 } 2793 return msg.toString(); 2794 } 2795 2796 public String explainTableState(final TableName table, TableState.State state) 2797 throws IOException { 2798 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 2799 if (tableState == null) { 2800 return "TableState in META: No table state in META for table " + table 2801 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 2802 } else if (!tableState.inStates(state)) { 2803 return "TableState in META: Not " + state + " state, but " + tableState; 2804 } else { 2805 return "TableState in META: OK"; 2806 } 2807 } 2808 2809 @Nullable 2810 public TableState findLastTableState(final TableName table) throws IOException { 2811 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 2812 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 2813 @Override 2814 public boolean visit(Result r) throws IOException { 2815 if (!Arrays.equals(r.getRow(), table.getName())) { 2816 return false; 2817 } 2818 TableState state = CatalogFamilyFormat.getTableState(r); 2819 if (state != null) { 2820 lastTableState.set(state); 2821 } 2822 return true; 2823 } 2824 }; 2825 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 2826 Integer.MAX_VALUE, visitor); 2827 return lastTableState.get(); 2828 } 2829 2830 /** 2831 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2832 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 2833 * table. 2834 * @param table the table to wait on. 2835 * @throws InterruptedException if interrupted while waiting 2836 * @throws IOException if an IO problem is encountered 2837 */ 2838 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 2839 waitTableEnabled(table, 30000); 2840 } 2841 2842 /** 2843 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2844 * have been all assigned. 2845 * @see #waitTableEnabled(TableName, long) 2846 * @param table Table to wait on. 2847 * @param timeoutMillis Time to wait on it being marked enabled. 2848 */ 2849 public void waitTableEnabled(byte[] table, long timeoutMillis) 2850 throws InterruptedException, IOException { 2851 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 2852 } 2853 2854 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 2855 waitFor(timeoutMillis, predicateTableEnabled(table)); 2856 } 2857 2858 /** 2859 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 2860 * after default period (30 seconds) 2861 * @param table Table to wait on. 2862 */ 2863 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 2864 waitTableDisabled(table, 30000); 2865 } 2866 2867 public void waitTableDisabled(TableName table, long millisTimeout) 2868 throws InterruptedException, IOException { 2869 waitFor(millisTimeout, predicateTableDisabled(table)); 2870 } 2871 2872 /** 2873 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 2874 * @param table Table to wait on. 2875 * @param timeoutMillis Time to wait on it being marked disabled. 2876 */ 2877 public void waitTableDisabled(byte[] table, long timeoutMillis) 2878 throws InterruptedException, IOException { 2879 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 2880 } 2881 2882 /** 2883 * Make sure that at least the specified number of region servers are running 2884 * @param num minimum number of region servers that should be running 2885 * @return true if we started some servers 2886 */ 2887 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 2888 boolean startedServer = false; 2889 SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster(); 2890 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 2891 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 2892 startedServer = true; 2893 } 2894 2895 return startedServer; 2896 } 2897 2898 /** 2899 * Make sure that at least the specified number of region servers are running. We don't count the 2900 * ones that are currently stopping or are stopped. 2901 * @param num minimum number of region servers that should be running 2902 * @return true if we started some servers 2903 */ 2904 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 2905 boolean startedServer = ensureSomeRegionServersAvailable(num); 2906 2907 int nonStoppedServers = 0; 2908 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2909 2910 HRegionServer hrs = rst.getRegionServer(); 2911 if (hrs.isStopping() || hrs.isStopped()) { 2912 LOG.info("A region server is stopped or stopping:" + hrs); 2913 } else { 2914 nonStoppedServers++; 2915 } 2916 } 2917 for (int i = nonStoppedServers; i < num; ++i) { 2918 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 2919 startedServer = true; 2920 } 2921 return startedServer; 2922 } 2923 2924 /** 2925 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 2926 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 2927 * @param c Initial configuration 2928 * @param differentiatingSuffix Suffix to differentiate this user from others. 2929 * @return A new configuration instance with a different user set into it. 2930 */ 2931 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 2932 throws IOException { 2933 FileSystem currentfs = FileSystem.get(c); 2934 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 2935 return User.getCurrent(); 2936 } 2937 // Else distributed filesystem. Make a new instance per daemon. Below 2938 // code is taken from the AppendTestUtil over in hdfs. 2939 String username = User.getCurrent().getName() + differentiatingSuffix; 2940 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 2941 return user; 2942 } 2943 2944 public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster) 2945 throws IOException { 2946 NavigableSet<String> online = new TreeSet<>(); 2947 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 2948 try { 2949 for (RegionInfo region : ProtobufUtil 2950 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 2951 online.add(region.getRegionNameAsString()); 2952 } 2953 } catch (RegionServerStoppedException e) { 2954 // That's fine. 2955 } 2956 } 2957 return online; 2958 } 2959 2960 /** 2961 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 2962 * linger. Here is the exception you'll see: 2963 * 2964 * <pre> 2965 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 2966 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 2967 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 2968 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 2969 * </pre> 2970 * 2971 * @param stream A DFSClient.DFSOutputStream. 2972 */ 2973 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 2974 try { 2975 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 2976 for (Class<?> clazz : clazzes) { 2977 String className = clazz.getSimpleName(); 2978 if (className.equals("DFSOutputStream")) { 2979 if (clazz.isInstance(stream)) { 2980 Field maxRecoveryErrorCountField = 2981 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 2982 maxRecoveryErrorCountField.setAccessible(true); 2983 maxRecoveryErrorCountField.setInt(stream, max); 2984 break; 2985 } 2986 } 2987 } 2988 } catch (Exception e) { 2989 LOG.info("Could not set max recovery field", e); 2990 } 2991 } 2992 2993 /** 2994 * Uses directly the assignment manager to assign the region. and waits until the specified region 2995 * has completed assignment. 2996 * @return true if the region is assigned false otherwise. 2997 */ 2998 public boolean assignRegion(final RegionInfo regionInfo) 2999 throws IOException, InterruptedException { 3000 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 3001 am.assign(regionInfo); 3002 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 3003 } 3004 3005 /** 3006 * Move region to destination server and wait till region is completely moved and online 3007 * @param destRegion region to move 3008 * @param destServer destination server of the region 3009 */ 3010 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3011 throws InterruptedException, IOException { 3012 HMaster master = getMiniHBaseCluster().getMaster(); 3013 // TODO: Here we start the move. The move can take a while. 3014 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3015 while (true) { 3016 ServerName serverName = 3017 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3018 if (serverName != null && serverName.equals(destServer)) { 3019 assertRegionOnServer(destRegion, serverName, 2000); 3020 break; 3021 } 3022 Thread.sleep(10); 3023 } 3024 } 3025 3026 /** 3027 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3028 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3029 * master has been informed and updated hbase:meta with the regions deployed server. 3030 * @param tableName the table name 3031 */ 3032 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3033 waitUntilAllRegionsAssigned(tableName, 3034 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3035 } 3036 3037 /** 3038 * Waith until all system table's regions get assigned 3039 */ 3040 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3041 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3042 } 3043 3044 /** 3045 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3046 * timeout. This means all regions have been deployed, master has been informed and updated 3047 * hbase:meta with the regions deployed server. 3048 * @param tableName the table name 3049 * @param timeout timeout, in milliseconds 3050 */ 3051 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3052 throws IOException { 3053 if (!TableName.isMetaTableName(tableName)) { 3054 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3055 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3056 + timeout + "ms"); 3057 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3058 @Override 3059 public String explainFailure() throws IOException { 3060 return explainTableAvailability(tableName); 3061 } 3062 3063 @Override 3064 public boolean evaluate() throws IOException { 3065 Scan scan = new Scan(); 3066 scan.addFamily(HConstants.CATALOG_FAMILY); 3067 boolean tableFound = false; 3068 try (ResultScanner s = meta.getScanner(scan)) { 3069 for (Result r; (r = s.next()) != null;) { 3070 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3071 RegionInfo info = RegionInfo.parseFromOrNull(b); 3072 if (info != null && info.getTable().equals(tableName)) { 3073 // Get server hosting this region from catalog family. Return false if no server 3074 // hosting this region, or if the server hosting this region was recently killed 3075 // (for fault tolerance testing). 3076 tableFound = true; 3077 byte[] server = 3078 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3079 if (server == null) { 3080 return false; 3081 } else { 3082 byte[] startCode = 3083 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3084 ServerName serverName = 3085 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3086 + Bytes.toLong(startCode)); 3087 if ( 3088 !getHBaseClusterInterface().isDistributedCluster() 3089 && getHBaseCluster().isKilledRS(serverName) 3090 ) { 3091 return false; 3092 } 3093 } 3094 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3095 return false; 3096 } 3097 } 3098 } 3099 } 3100 if (!tableFound) { 3101 LOG.warn( 3102 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3103 } 3104 return tableFound; 3105 } 3106 }); 3107 } 3108 } 3109 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3110 // check from the master state if we are using a mini cluster 3111 if (!getHBaseClusterInterface().isDistributedCluster()) { 3112 // So, all regions are in the meta table but make sure master knows of the assignments before 3113 // returning -- sometimes this can lag. 3114 HMaster master = getHBaseCluster().getMaster(); 3115 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3116 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3117 @Override 3118 public String explainFailure() throws IOException { 3119 return explainTableAvailability(tableName); 3120 } 3121 3122 @Override 3123 public boolean evaluate() throws IOException { 3124 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3125 return hris != null && !hris.isEmpty(); 3126 } 3127 }); 3128 } 3129 LOG.info("All regions for table " + tableName + " assigned."); 3130 } 3131 3132 /** 3133 * Do a small get/scan against one store. This is required because store has no actual methods of 3134 * querying itself, and relies on StoreScanner. 3135 */ 3136 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3137 Scan scan = new Scan(get); 3138 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3139 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3140 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3141 // readpoint 0. 3142 0); 3143 3144 List<Cell> result = new ArrayList<>(); 3145 scanner.next(result); 3146 if (!result.isEmpty()) { 3147 // verify that we are on the row we want: 3148 Cell kv = result.get(0); 3149 if (!CellUtil.matchingRows(kv, get.getRow())) { 3150 result.clear(); 3151 } 3152 } 3153 scanner.close(); 3154 return result; 3155 } 3156 3157 /** 3158 * Create region split keys between startkey and endKey 3159 * @param numRegions the number of regions to be created. it has to be greater than 3. 3160 * @return resulting split keys 3161 */ 3162 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3163 assertTrue(numRegions > 3); 3164 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3165 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3166 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3167 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3168 return result; 3169 } 3170 3171 /** 3172 * Do a small get/scan against one store. This is required because store has no actual methods of 3173 * querying itself, and relies on StoreScanner. 3174 */ 3175 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3176 throws IOException { 3177 Get get = new Get(row); 3178 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3179 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3180 3181 return getFromStoreFile(store, get); 3182 } 3183 3184 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3185 final List<? extends Cell> actual) { 3186 final int eLen = expected.size(); 3187 final int aLen = actual.size(); 3188 final int minLen = Math.min(eLen, aLen); 3189 3190 int i = 0; 3191 while ( 3192 i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0 3193 ) { 3194 i++; 3195 } 3196 3197 if (additionalMsg == null) { 3198 additionalMsg = ""; 3199 } 3200 if (!additionalMsg.isEmpty()) { 3201 additionalMsg = ". " + additionalMsg; 3202 } 3203 3204 if (eLen != aLen || i != minLen) { 3205 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3206 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3207 + " (length " + aLen + ")" + additionalMsg); 3208 } 3209 } 3210 3211 public static <T> String safeGetAsStr(List<T> lst, int i) { 3212 if (0 <= i && i < lst.size()) { 3213 return lst.get(i).toString(); 3214 } else { 3215 return "<out_of_range>"; 3216 } 3217 } 3218 3219 public String getClusterKey() { 3220 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3221 + ":" 3222 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3223 } 3224 3225 /** 3226 * Creates a random table with the given parameters 3227 */ 3228 public Table createRandomTable(TableName tableName, final Collection<String> families, 3229 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3230 final int numRowsPerFlush) throws IOException, InterruptedException { 3231 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3232 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3233 + maxVersions + "\n"); 3234 3235 final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L); 3236 final int numCF = families.size(); 3237 final byte[][] cfBytes = new byte[numCF][]; 3238 { 3239 int cfIndex = 0; 3240 for (String cf : families) { 3241 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3242 } 3243 } 3244 3245 final int actualStartKey = 0; 3246 final int actualEndKey = Integer.MAX_VALUE; 3247 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3248 final int splitStartKey = actualStartKey + keysPerRegion; 3249 final int splitEndKey = actualEndKey - keysPerRegion; 3250 final String keyFormat = "%08x"; 3251 final Table table = createTable(tableName, cfBytes, maxVersions, 3252 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3253 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3254 3255 if (hbaseCluster != null) { 3256 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3257 } 3258 3259 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3260 3261 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3262 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3263 final byte[] row = Bytes.toBytes( 3264 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3265 3266 Put put = new Put(row); 3267 Delete del = new Delete(row); 3268 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3269 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3270 final long ts = rand.nextInt(); 3271 final byte[] qual = Bytes.toBytes("col" + iCol); 3272 if (rand.nextBoolean()) { 3273 final byte[] value = 3274 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3275 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3276 put.addColumn(cf, qual, ts, value); 3277 } else if (rand.nextDouble() < 0.8) { 3278 del.addColumn(cf, qual, ts); 3279 } else { 3280 del.addColumns(cf, qual, ts); 3281 } 3282 } 3283 3284 if (!put.isEmpty()) { 3285 mutator.mutate(put); 3286 } 3287 3288 if (!del.isEmpty()) { 3289 mutator.mutate(del); 3290 } 3291 } 3292 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3293 mutator.flush(); 3294 if (hbaseCluster != null) { 3295 getMiniHBaseCluster().flushcache(table.getName()); 3296 } 3297 } 3298 mutator.close(); 3299 3300 return table; 3301 } 3302 3303 public static int randomFreePort() { 3304 return HBaseCommonTestingUtil.randomFreePort(); 3305 } 3306 3307 public static String randomMultiCastAddress() { 3308 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3309 } 3310 3311 public static void waitForHostPort(String host, int port) throws IOException { 3312 final int maxTimeMs = 10000; 3313 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3314 IOException savedException = null; 3315 LOG.info("Waiting for server at " + host + ":" + port); 3316 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3317 try { 3318 Socket sock = new Socket(InetAddress.getByName(host), port); 3319 sock.close(); 3320 savedException = null; 3321 LOG.info("Server at " + host + ":" + port + " is available"); 3322 break; 3323 } catch (UnknownHostException e) { 3324 throw new IOException("Failed to look up " + host, e); 3325 } catch (IOException e) { 3326 savedException = e; 3327 } 3328 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3329 } 3330 3331 if (savedException != null) { 3332 throw savedException; 3333 } 3334 } 3335 3336 /** 3337 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3338 * continues. 3339 * @return the number of regions the table was split into 3340 */ 3341 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3342 byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding) 3343 throws IOException { 3344 return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression, 3345 dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT); 3346 } 3347 3348 /** 3349 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3350 * continues. 3351 * @return the number of regions the table was split into 3352 */ 3353 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3354 byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding, 3355 int numRegionsPerServer, int regionReplication, Durability durability) throws IOException { 3356 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 3357 builder.setDurability(durability); 3358 builder.setRegionReplication(regionReplication); 3359 ColumnFamilyDescriptorBuilder cfBuilder = 3360 ColumnFamilyDescriptorBuilder.newBuilder(columnFamily); 3361 cfBuilder.setDataBlockEncoding(dataBlockEncoding); 3362 cfBuilder.setCompressionType(compression); 3363 return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(), 3364 numRegionsPerServer); 3365 } 3366 3367 /** 3368 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3369 * continues. 3370 * @return the number of regions the table was split into 3371 */ 3372 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3373 byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding, 3374 int numRegionsPerServer, int regionReplication, Durability durability) throws IOException { 3375 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 3376 builder.setDurability(durability); 3377 builder.setRegionReplication(regionReplication); 3378 ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length]; 3379 for (int i = 0; i < columnFamilies.length; i++) { 3380 ColumnFamilyDescriptorBuilder cfBuilder = 3381 ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]); 3382 cfBuilder.setDataBlockEncoding(dataBlockEncoding); 3383 cfBuilder.setCompressionType(compression); 3384 hcds[i] = cfBuilder.build(); 3385 } 3386 return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer); 3387 } 3388 3389 /** 3390 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3391 * continues. 3392 * @return the number of regions the table was split into 3393 */ 3394 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3395 ColumnFamilyDescriptor hcd) throws IOException { 3396 return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER); 3397 } 3398 3399 /** 3400 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3401 * continues. 3402 * @return the number of regions the table was split into 3403 */ 3404 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3405 ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException { 3406 return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd }, 3407 numRegionsPerServer); 3408 } 3409 3410 /** 3411 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3412 * continues. 3413 * @return the number of regions the table was split into 3414 */ 3415 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3416 ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException { 3417 return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(), 3418 numRegionsPerServer); 3419 } 3420 3421 /** 3422 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3423 * continues. 3424 * @return the number of regions the table was split into 3425 */ 3426 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td, 3427 ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer) 3428 throws IOException { 3429 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td); 3430 for (ColumnFamilyDescriptor cd : cds) { 3431 if (!td.hasColumnFamily(cd.getName())) { 3432 builder.setColumnFamily(cd); 3433 } 3434 } 3435 td = builder.build(); 3436 int totalNumberOfRegions = 0; 3437 Connection unmanagedConnection = ConnectionFactory.createConnection(conf); 3438 Admin admin = unmanagedConnection.getAdmin(); 3439 3440 try { 3441 // create a table a pre-splits regions. 3442 // The number of splits is set as: 3443 // region servers * regions per region server). 3444 int numberOfServers = admin.getRegionServers().size(); 3445 if (numberOfServers == 0) { 3446 throw new IllegalStateException("No live regionservers"); 3447 } 3448 3449 totalNumberOfRegions = numberOfServers * numRegionsPerServer; 3450 LOG.info("Number of live regionservers: " + numberOfServers + ", " 3451 + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: " 3452 + numRegionsPerServer + ")"); 3453 3454 byte[][] splits = splitter.split(totalNumberOfRegions); 3455 3456 admin.createTable(td, splits); 3457 } catch (MasterNotRunningException e) { 3458 LOG.error("Master not running", e); 3459 throw new IOException(e); 3460 } catch (TableExistsException e) { 3461 LOG.warn("Table " + td.getTableName() + " already exists, continuing"); 3462 } finally { 3463 admin.close(); 3464 unmanagedConnection.close(); 3465 } 3466 return totalNumberOfRegions; 3467 } 3468 3469 public static int getMetaRSPort(Connection connection) throws IOException { 3470 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3471 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3472 } 3473 } 3474 3475 /** 3476 * Due to async racing issue, a region may not be in the online region list of a region server 3477 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3478 */ 3479 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3480 final long timeout) throws IOException, InterruptedException { 3481 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3482 while (true) { 3483 List<RegionInfo> regions = getAdmin().getRegions(server); 3484 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3485 long now = EnvironmentEdgeManager.currentTime(); 3486 if (now > timeoutTime) break; 3487 Thread.sleep(10); 3488 } 3489 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3490 } 3491 3492 /** 3493 * Check to make sure the region is open on the specified region server, but not on any other one. 3494 */ 3495 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3496 final long timeout) throws IOException, InterruptedException { 3497 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3498 while (true) { 3499 List<RegionInfo> regions = getAdmin().getRegions(server); 3500 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3501 List<JVMClusterUtil.RegionServerThread> rsThreads = 3502 getHBaseCluster().getLiveRegionServerThreads(); 3503 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3504 HRegionServer rs = rsThread.getRegionServer(); 3505 if (server.equals(rs.getServerName())) { 3506 continue; 3507 } 3508 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3509 for (HRegion r : hrs) { 3510 assertTrue("Region should not be double assigned", 3511 r.getRegionInfo().getRegionId() != hri.getRegionId()); 3512 } 3513 } 3514 return; // good, we are happy 3515 } 3516 long now = EnvironmentEdgeManager.currentTime(); 3517 if (now > timeoutTime) break; 3518 Thread.sleep(10); 3519 } 3520 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3521 } 3522 3523 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3524 TableDescriptor td = 3525 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3526 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3527 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3528 } 3529 3530 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3531 BlockCache blockCache) throws IOException { 3532 TableDescriptor td = 3533 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3534 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3535 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3536 } 3537 3538 public static void setFileSystemURI(String fsURI) { 3539 FS_URI = fsURI; 3540 } 3541 3542 /** 3543 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3544 */ 3545 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3546 return new ExplainingPredicate<IOException>() { 3547 @Override 3548 public String explainFailure() throws IOException { 3549 final RegionStates regionStates = 3550 getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 3551 return "found in transition: " + regionStates.getRegionsInTransition().toString(); 3552 } 3553 3554 @Override 3555 public boolean evaluate() throws IOException { 3556 HMaster master = getMiniHBaseCluster().getMaster(); 3557 if (master == null) return false; 3558 AssignmentManager am = master.getAssignmentManager(); 3559 if (am == null) return false; 3560 return !am.hasRegionsInTransition(); 3561 } 3562 }; 3563 } 3564 3565 /** 3566 * Returns a {@link Predicate} for checking that table is enabled 3567 */ 3568 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3569 return new ExplainingPredicate<IOException>() { 3570 @Override 3571 public String explainFailure() throws IOException { 3572 return explainTableState(tableName, TableState.State.ENABLED); 3573 } 3574 3575 @Override 3576 public boolean evaluate() throws IOException { 3577 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3578 } 3579 }; 3580 } 3581 3582 /** 3583 * Returns a {@link Predicate} for checking that table is enabled 3584 */ 3585 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3586 return new ExplainingPredicate<IOException>() { 3587 @Override 3588 public String explainFailure() throws IOException { 3589 return explainTableState(tableName, TableState.State.DISABLED); 3590 } 3591 3592 @Override 3593 public boolean evaluate() throws IOException { 3594 return getAdmin().isTableDisabled(tableName); 3595 } 3596 }; 3597 } 3598 3599 /** 3600 * Returns a {@link Predicate} for checking that table is enabled 3601 */ 3602 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3603 return new ExplainingPredicate<IOException>() { 3604 @Override 3605 public String explainFailure() throws IOException { 3606 return explainTableAvailability(tableName); 3607 } 3608 3609 @Override 3610 public boolean evaluate() throws IOException { 3611 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3612 if (tableAvailable) { 3613 try (Table table = getConnection().getTable(tableName)) { 3614 TableDescriptor htd = table.getDescriptor(); 3615 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3616 .getAllRegionLocations()) { 3617 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3618 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3619 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3620 for (byte[] family : htd.getColumnFamilyNames()) { 3621 scan.addFamily(family); 3622 } 3623 try (ResultScanner scanner = table.getScanner(scan)) { 3624 scanner.next(); 3625 } 3626 } 3627 } 3628 } 3629 return tableAvailable; 3630 } 3631 }; 3632 } 3633 3634 /** 3635 * Wait until no regions in transition. 3636 * @param timeout How long to wait. 3637 */ 3638 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3639 waitFor(timeout, predicateNoRegionsInTransition()); 3640 } 3641 3642 /** 3643 * Wait until no regions in transition. (time limit 15min) 3644 */ 3645 public void waitUntilNoRegionsInTransition() throws IOException { 3646 waitUntilNoRegionsInTransition(15 * 60000); 3647 } 3648 3649 /** 3650 * Wait until labels is ready in VisibilityLabelsCache. 3651 */ 3652 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 3653 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 3654 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 3655 3656 @Override 3657 public boolean evaluate() { 3658 for (String label : labels) { 3659 if (labelsCache.getLabelOrdinal(label) == 0) { 3660 return false; 3661 } 3662 } 3663 return true; 3664 } 3665 3666 @Override 3667 public String explainFailure() { 3668 for (String label : labels) { 3669 if (labelsCache.getLabelOrdinal(label) == 0) { 3670 return label + " is not available yet"; 3671 } 3672 } 3673 return ""; 3674 } 3675 }); 3676 } 3677 3678 /** 3679 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3680 * available. 3681 * @return the list of column descriptors 3682 */ 3683 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 3684 return generateColumnDescriptors(""); 3685 } 3686 3687 /** 3688 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3689 * available. 3690 * @param prefix family names prefix 3691 * @return the list of column descriptors 3692 */ 3693 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 3694 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 3695 long familyId = 0; 3696 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 3697 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 3698 for (BloomType bloomType : BloomType.values()) { 3699 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 3700 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 3701 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 3702 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 3703 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 3704 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 3705 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 3706 familyId++; 3707 } 3708 } 3709 } 3710 return columnFamilyDescriptors; 3711 } 3712 3713 /** 3714 * Get supported compression algorithms. 3715 * @return supported compression algorithms. 3716 */ 3717 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 3718 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 3719 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 3720 for (String algoName : allAlgos) { 3721 try { 3722 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 3723 algo.getCompressor(); 3724 supportedAlgos.add(algo); 3725 } catch (Throwable t) { 3726 // this algo is not available 3727 } 3728 } 3729 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 3730 } 3731 3732 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 3733 Scan scan = new Scan().withStartRow(row); 3734 scan.setReadType(ReadType.PREAD); 3735 scan.setCaching(1); 3736 scan.setReversed(true); 3737 scan.addFamily(family); 3738 try (RegionScanner scanner = r.getScanner(scan)) { 3739 List<Cell> cells = new ArrayList<>(1); 3740 scanner.next(cells); 3741 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 3742 return null; 3743 } 3744 return Result.create(cells); 3745 } 3746 } 3747 3748 private boolean isTargetTable(final byte[] inRow, Cell c) { 3749 String inputRowString = Bytes.toString(inRow); 3750 int i = inputRowString.indexOf(HConstants.DELIMITER); 3751 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 3752 int o = outputRowString.indexOf(HConstants.DELIMITER); 3753 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 3754 } 3755 3756 /** 3757 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 3758 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 3759 * kerby KDC server and utility for using it, 3760 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 3761 * less baggage. It came in in HBASE-5291. 3762 */ 3763 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 3764 Properties conf = MiniKdc.createConf(); 3765 conf.put(MiniKdc.DEBUG, true); 3766 MiniKdc kdc = null; 3767 File dir = null; 3768 // There is time lag between selecting a port and trying to bind with it. It's possible that 3769 // another service captures the port in between which'll result in BindException. 3770 boolean bindException; 3771 int numTries = 0; 3772 do { 3773 try { 3774 bindException = false; 3775 dir = new File(getDataTestDir("kdc").toUri().getPath()); 3776 kdc = new MiniKdc(conf, dir); 3777 kdc.start(); 3778 } catch (BindException e) { 3779 FileUtils.deleteDirectory(dir); // clean directory 3780 numTries++; 3781 if (numTries == 3) { 3782 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 3783 throw e; 3784 } 3785 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 3786 bindException = true; 3787 } 3788 } while (bindException); 3789 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 3790 return kdc; 3791 } 3792 3793 public int getNumHFiles(final TableName tableName, final byte[] family) { 3794 int numHFiles = 0; 3795 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 3796 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 3797 } 3798 return numHFiles; 3799 } 3800 3801 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 3802 final byte[] family) { 3803 int numHFiles = 0; 3804 for (Region region : rs.getRegions(tableName)) { 3805 numHFiles += region.getStore(family).getStorefilesCount(); 3806 } 3807 return numHFiles; 3808 } 3809 3810 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 3811 assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode()); 3812 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 3813 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 3814 assertEquals(ltdFamilies.size(), rtdFamilies.size()); 3815 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 3816 it2 = rtdFamilies.iterator(); it.hasNext();) { 3817 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 3818 } 3819 } 3820 3821 /** 3822 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 3823 * invocations. 3824 */ 3825 public static void await(final long sleepMillis, final BooleanSupplier condition) 3826 throws InterruptedException { 3827 try { 3828 while (!condition.getAsBoolean()) { 3829 Thread.sleep(sleepMillis); 3830 } 3831 } catch (RuntimeException e) { 3832 if (e.getCause() instanceof AssertionError) { 3833 throw (AssertionError) e.getCause(); 3834 } 3835 throw e; 3836 } 3837 } 3838}