001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.Closeable; 026import java.io.File; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.io.UncheckedIOException; 030import java.lang.reflect.Field; 031import java.net.BindException; 032import java.net.DatagramSocket; 033import java.net.InetAddress; 034import java.net.ServerSocket; 035import java.net.Socket; 036import java.net.UnknownHostException; 037import java.nio.charset.StandardCharsets; 038import java.security.MessageDigest; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.HashSet; 044import java.util.Iterator; 045import java.util.List; 046import java.util.Map; 047import java.util.NavigableSet; 048import java.util.Properties; 049import java.util.Random; 050import java.util.Set; 051import java.util.TreeSet; 052import java.util.concurrent.ExecutionException; 053import java.util.concurrent.ThreadLocalRandom; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicReference; 056import java.util.function.BooleanSupplier; 057import org.apache.commons.io.FileUtils; 058import org.apache.commons.lang3.RandomStringUtils; 059import org.apache.hadoop.conf.Configuration; 060import org.apache.hadoop.fs.FileSystem; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 063import org.apache.hadoop.hbase.Waiter.Predicate; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.AsyncAdmin; 066import org.apache.hadoop.hbase.client.AsyncClusterConnection; 067import org.apache.hadoop.hbase.client.BufferedMutator; 068import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.Connection; 072import org.apache.hadoop.hbase.client.ConnectionFactory; 073import org.apache.hadoop.hbase.client.Consistency; 074import org.apache.hadoop.hbase.client.Delete; 075import org.apache.hadoop.hbase.client.Durability; 076import org.apache.hadoop.hbase.client.Get; 077import org.apache.hadoop.hbase.client.Hbck; 078import org.apache.hadoop.hbase.client.MasterRegistry; 079import org.apache.hadoop.hbase.client.Put; 080import org.apache.hadoop.hbase.client.RegionInfo; 081import org.apache.hadoop.hbase.client.RegionInfoBuilder; 082import org.apache.hadoop.hbase.client.RegionLocator; 083import org.apache.hadoop.hbase.client.Result; 084import org.apache.hadoop.hbase.client.ResultScanner; 085import org.apache.hadoop.hbase.client.Scan; 086import org.apache.hadoop.hbase.client.Scan.ReadType; 087import org.apache.hadoop.hbase.client.Table; 088import org.apache.hadoop.hbase.client.TableDescriptor; 089import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 090import org.apache.hadoop.hbase.client.TableState; 091import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 092import org.apache.hadoop.hbase.fs.HFileSystem; 093import org.apache.hadoop.hbase.io.compress.Compression; 094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 096import org.apache.hadoop.hbase.io.hfile.BlockCache; 097import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 098import org.apache.hadoop.hbase.io.hfile.HFile; 099import org.apache.hadoop.hbase.ipc.RpcServerInterface; 100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 101import org.apache.hadoop.hbase.master.HMaster; 102import org.apache.hadoop.hbase.master.MasterFileSystem; 103import org.apache.hadoop.hbase.master.RegionState; 104import org.apache.hadoop.hbase.master.ServerManager; 105import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 106import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 107import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 108import org.apache.hadoop.hbase.master.assignment.RegionStates; 109import org.apache.hadoop.hbase.mob.MobFileCache; 110import org.apache.hadoop.hbase.regionserver.BloomType; 111import org.apache.hadoop.hbase.regionserver.ChunkCreator; 112import org.apache.hadoop.hbase.regionserver.HRegion; 113import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 114import org.apache.hadoop.hbase.regionserver.HRegionServer; 115import org.apache.hadoop.hbase.regionserver.HStore; 116import org.apache.hadoop.hbase.regionserver.InternalScanner; 117import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 118import org.apache.hadoop.hbase.regionserver.Region; 119import org.apache.hadoop.hbase.regionserver.RegionScanner; 120import org.apache.hadoop.hbase.regionserver.RegionServerServices; 121import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 122import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 123import org.apache.hadoop.hbase.security.User; 124import org.apache.hadoop.hbase.security.UserProvider; 125import org.apache.hadoop.hbase.security.access.AccessController; 126import org.apache.hadoop.hbase.security.access.PermissionStorage; 127import org.apache.hadoop.hbase.security.access.SecureTestUtil; 128import org.apache.hadoop.hbase.security.token.TokenProvider; 129import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 130import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 131import org.apache.hadoop.hbase.util.Bytes; 132import org.apache.hadoop.hbase.util.CommonFSUtils; 133import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 134import org.apache.hadoop.hbase.util.FSUtils; 135import org.apache.hadoop.hbase.util.JVM; 136import org.apache.hadoop.hbase.util.JVMClusterUtil; 137import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 138import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 139import org.apache.hadoop.hbase.util.Pair; 140import org.apache.hadoop.hbase.util.RetryCounter; 141import org.apache.hadoop.hbase.util.Threads; 142import org.apache.hadoop.hbase.wal.WAL; 143import org.apache.hadoop.hbase.wal.WALFactory; 144import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 145import org.apache.hadoop.hbase.zookeeper.ZKConfig; 146import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 147import org.apache.hadoop.hdfs.DFSClient; 148import org.apache.hadoop.hdfs.DFSConfigKeys; 149import org.apache.hadoop.hdfs.DistributedFileSystem; 150import org.apache.hadoop.hdfs.MiniDFSCluster; 151import org.apache.hadoop.hdfs.server.datanode.DataNode; 152import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 153import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 154import org.apache.hadoop.mapred.JobConf; 155import org.apache.hadoop.mapred.MiniMRCluster; 156import org.apache.hadoop.minikdc.MiniKdc; 157import org.apache.yetus.audience.InterfaceAudience; 158import org.apache.yetus.audience.InterfaceStability; 159import org.apache.zookeeper.WatchedEvent; 160import org.apache.zookeeper.ZooKeeper; 161import org.apache.zookeeper.ZooKeeper.States; 162 163import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 164 165import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 166 167/** 168 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 169 * functionality. Create an instance and keep it around testing HBase. 170 * <p/> 171 * This class is meant to be your one-stop shop for anything you might need testing. Manages one 172 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster}, 173 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real 174 * cluster. 175 * <p/> 176 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration. 177 * <p/> 178 * It does not set logging levels. 179 * <p/> 180 * In the configuration properties, default values for master-info-port and region-server-port are 181 * overridden such that a random port will be assigned (thus avoiding port contention if another 182 * local HBase instance is already running). 183 * <p/> 184 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 185 * setting it to true. 186 */ 187@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 188@InterfaceStability.Evolving 189public class HBaseTestingUtil extends HBaseZKTestingUtil { 190 191 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 192 193 private MiniDFSCluster dfsCluster = null; 194 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 195 196 private volatile HBaseClusterInterface hbaseCluster = null; 197 private MiniMRCluster mrCluster = null; 198 199 /** If there is a mini cluster running for this testing utility instance. */ 200 private volatile boolean miniClusterRunning; 201 202 private String hadoopLogDir; 203 204 /** 205 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 206 */ 207 private Path dataTestDirOnTestFS = null; 208 209 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 210 211 /** Filesystem URI used for map-reduce mini-cluster setup */ 212 private static String FS_URI; 213 214 /** This is for unit tests parameterized with a single boolean. */ 215 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 216 217 /** 218 * Checks to see if a specific port is available. 219 * @param port the port number to check for availability 220 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 221 */ 222 public static boolean available(int port) { 223 ServerSocket ss = null; 224 DatagramSocket ds = null; 225 try { 226 ss = new ServerSocket(port); 227 ss.setReuseAddress(true); 228 ds = new DatagramSocket(port); 229 ds.setReuseAddress(true); 230 return true; 231 } catch (IOException e) { 232 // Do nothing 233 } finally { 234 if (ds != null) { 235 ds.close(); 236 } 237 238 if (ss != null) { 239 try { 240 ss.close(); 241 } catch (IOException e) { 242 /* should not be thrown */ 243 } 244 } 245 } 246 247 return false; 248 } 249 250 /** 251 * Create all combinations of Bloom filters and compression algorithms for testing. 252 */ 253 private static List<Object[]> bloomAndCompressionCombinations() { 254 List<Object[]> configurations = new ArrayList<>(); 255 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 256 for (BloomType bloomType : BloomType.values()) { 257 configurations.add(new Object[] { comprAlgo, bloomType }); 258 } 259 } 260 return Collections.unmodifiableList(configurations); 261 } 262 263 /** 264 * Create combination of memstoreTS and tags 265 */ 266 private static List<Object[]> memStoreTSAndTagsCombination() { 267 List<Object[]> configurations = new ArrayList<>(); 268 configurations.add(new Object[] { false, false }); 269 configurations.add(new Object[] { false, true }); 270 configurations.add(new Object[] { true, false }); 271 configurations.add(new Object[] { true, true }); 272 return Collections.unmodifiableList(configurations); 273 } 274 275 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 276 List<Object[]> configurations = new ArrayList<>(); 277 configurations.add(new Object[] { false, false, true }); 278 configurations.add(new Object[] { false, false, false }); 279 configurations.add(new Object[] { false, true, true }); 280 configurations.add(new Object[] { false, true, false }); 281 configurations.add(new Object[] { true, false, true }); 282 configurations.add(new Object[] { true, false, false }); 283 configurations.add(new Object[] { true, true, true }); 284 configurations.add(new Object[] { true, true, false }); 285 return Collections.unmodifiableList(configurations); 286 } 287 288 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 289 bloomAndCompressionCombinations(); 290 291 /** 292 * <p> 293 * Create an HBaseTestingUtility using a default configuration. 294 * <p> 295 * Initially, all tmp files are written to a local test data directory. Once 296 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 297 * data will be written to the DFS directory instead. 298 */ 299 public HBaseTestingUtil() { 300 this(HBaseConfiguration.create()); 301 } 302 303 /** 304 * <p> 305 * Create an HBaseTestingUtility using a given configuration. 306 * <p> 307 * Initially, all tmp files are written to a local test data directory. Once 308 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 309 * data will be written to the DFS directory instead. 310 * @param conf The configuration to use for further operations 311 */ 312 public HBaseTestingUtil(@Nullable Configuration conf) { 313 super(conf); 314 315 // a hbase checksum verification failure will cause unit tests to fail 316 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 317 318 // Save this for when setting default file:// breaks things 319 if (this.conf.get("fs.defaultFS") != null) { 320 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 321 } 322 if (this.conf.get(HConstants.HBASE_DIR) != null) { 323 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 324 } 325 // Every cluster is a local cluster until we start DFS 326 // Note that conf could be null, but this.conf will not be 327 String dataTestDir = getDataTestDir().toString(); 328 this.conf.set("fs.defaultFS", "file:///"); 329 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 330 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 331 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 332 // If the value for random ports isn't set set it to true, thus making 333 // tests opt-out for random port assignment 334 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 335 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 336 } 337 338 /** 339 * Close both the region {@code r} and it's underlying WAL. For use in tests. 340 */ 341 public static void closeRegionAndWAL(final Region r) throws IOException { 342 closeRegionAndWAL((HRegion) r); 343 } 344 345 /** 346 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 347 */ 348 public static void closeRegionAndWAL(final HRegion r) throws IOException { 349 if (r == null) return; 350 r.close(); 351 if (r.getWAL() == null) return; 352 r.getWAL().close(); 353 } 354 355 /** 356 * Start mini secure cluster with given kdc and principals. 357 * @param kdc Mini kdc server 358 * @param servicePrincipal Service principal without realm. 359 * @param spnegoPrincipal Spnego principal without realm. 360 * @return Handler to shutdown the cluster 361 */ 362 public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal, 363 String spnegoPrincipal) throws Exception { 364 Configuration conf = getConfiguration(); 365 366 SecureTestUtil.enableSecurity(conf); 367 VisibilityTestUtil.enableVisiblityLabels(conf); 368 SecureTestUtil.verifyConfiguration(conf); 369 370 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 371 AccessController.class.getName() + ',' + TokenProvider.class.getName()); 372 373 HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(), 374 spnegoPrincipal + '@' + kdc.getRealm()); 375 376 startMiniCluster(); 377 try { 378 waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); 379 } catch (Exception e) { 380 shutdownMiniCluster(); 381 throw e; 382 } 383 384 return this::shutdownMiniCluster; 385 } 386 387 /** 388 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 389 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 390 * by the Configuration. If say, a Connection was being used against a cluster that had been 391 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 392 * Rather than use the return direct, its usually best to make a copy and use that. Do 393 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 394 * @return Instance of Configuration. 395 */ 396 @Override 397 public Configuration getConfiguration() { 398 return super.getConfiguration(); 399 } 400 401 public void setHBaseCluster(HBaseClusterInterface hbaseCluster) { 402 this.hbaseCluster = hbaseCluster; 403 } 404 405 /** 406 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 407 * have many concurrent tests running if we need to. Moding a System property is not the way to do 408 * concurrent instances -- another instance could grab the temporary value unintentionally -- but 409 * not anything can do about it at moment; single instance only is how the minidfscluster works. 410 * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir 411 * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir 412 * (We do not create them!). 413 * @return The calculated data test build directory, if newly-created. 414 */ 415 @Override 416 protected Path setupDataTestDir() { 417 Path testPath = super.setupDataTestDir(); 418 if (null == testPath) { 419 return null; 420 } 421 422 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 423 424 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 425 // we want our own value to ensure uniqueness on the same machine 426 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 427 428 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 429 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 430 return testPath; 431 } 432 433 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 434 435 String sysValue = System.getProperty(propertyName); 436 437 if (sysValue != null) { 438 // There is already a value set. So we do nothing but hope 439 // that there will be no conflicts 440 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 441 + " so I do NOT create it in " + parent); 442 String confValue = conf.get(propertyName); 443 if (confValue != null && !confValue.endsWith(sysValue)) { 444 LOG.warn(propertyName + " property value differs in configuration and system: " 445 + "Configuration=" + confValue + " while System=" + sysValue 446 + " Erasing configuration value by system value."); 447 } 448 conf.set(propertyName, sysValue); 449 } else { 450 // Ok, it's not set, so we create it as a subdirectory 451 createSubDir(propertyName, parent, subDirName); 452 System.setProperty(propertyName, conf.get(propertyName)); 453 } 454 } 455 456 /** 457 * @return Where to write test data on the test filesystem; Returns working directory for the test 458 * filesystem by default 459 * @see #setupDataTestDirOnTestFS() 460 * @see #getTestFileSystem() 461 */ 462 private Path getBaseTestDirOnTestFS() throws IOException { 463 FileSystem fs = getTestFileSystem(); 464 return new Path(fs.getWorkingDirectory(), "test-data"); 465 } 466 467 /** 468 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 469 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 470 * on it. 471 * @return a unique path in the test filesystem 472 */ 473 public Path getDataTestDirOnTestFS() throws IOException { 474 if (dataTestDirOnTestFS == null) { 475 setupDataTestDirOnTestFS(); 476 } 477 478 return dataTestDirOnTestFS; 479 } 480 481 /** 482 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 483 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 484 * on it. 485 * @return a unique path in the test filesystem 486 * @param subdirName name of the subdir to create under the base test dir 487 */ 488 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 489 return new Path(getDataTestDirOnTestFS(), subdirName); 490 } 491 492 /** 493 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 494 * setup. 495 */ 496 private void setupDataTestDirOnTestFS() throws IOException { 497 if (dataTestDirOnTestFS != null) { 498 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 499 return; 500 } 501 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 502 } 503 504 /** 505 * Sets up a new path in test filesystem to be used by tests. 506 */ 507 private Path getNewDataTestDirOnTestFS() throws IOException { 508 // The file system can be either local, mini dfs, or if the configuration 509 // is supplied externally, it can be an external cluster FS. If it is a local 510 // file system, the tests should use getBaseTestDir, otherwise, we can use 511 // the working directory, and create a unique sub dir there 512 FileSystem fs = getTestFileSystem(); 513 Path newDataTestDir; 514 String randomStr = getRandomUUID().toString(); 515 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 516 newDataTestDir = new Path(getDataTestDir(), randomStr); 517 File dataTestDir = new File(newDataTestDir.toString()); 518 if (deleteOnExit()) dataTestDir.deleteOnExit(); 519 } else { 520 Path base = getBaseTestDirOnTestFS(); 521 newDataTestDir = new Path(base, randomStr); 522 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 523 } 524 return newDataTestDir; 525 } 526 527 /** 528 * Cleans the test data directory on the test filesystem. 529 * @return True if we removed the test dirs 530 */ 531 public boolean cleanupDataTestDirOnTestFS() throws IOException { 532 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 533 if (ret) { 534 dataTestDirOnTestFS = null; 535 } 536 return ret; 537 } 538 539 /** 540 * Cleans a subdirectory under the test data directory on the test filesystem. 541 * @return True if we removed child 542 */ 543 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 544 Path cpath = getDataTestDirOnTestFS(subdirName); 545 return getTestFileSystem().delete(cpath, true); 546 } 547 548 /** 549 * Start a minidfscluster. 550 * @param servers How many DNs to start. 551 * @see #shutdownMiniDFSCluster() 552 * @return The mini dfs cluster created. 553 */ 554 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 555 return startMiniDFSCluster(servers, null); 556 } 557 558 /** 559 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 560 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 561 * instances of the datanodes will have the same host name. 562 * @param hosts hostnames DNs to run on. 563 * @see #shutdownMiniDFSCluster() 564 * @return The mini dfs cluster created. 565 */ 566 public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception { 567 if (hosts != null && hosts.length != 0) { 568 return startMiniDFSCluster(hosts.length, hosts); 569 } else { 570 return startMiniDFSCluster(1, null); 571 } 572 } 573 574 /** 575 * Start a minidfscluster. Can only create one. 576 * @param servers How many DNs to start. 577 * @param hosts hostnames DNs to run on. 578 * @see #shutdownMiniDFSCluster() 579 * @return The mini dfs cluster created. 580 */ 581 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception { 582 return startMiniDFSCluster(servers, null, hosts); 583 } 584 585 private void setFs() throws IOException { 586 if (this.dfsCluster == null) { 587 LOG.info("Skipping setting fs because dfsCluster is null"); 588 return; 589 } 590 FileSystem fs = this.dfsCluster.getFileSystem(); 591 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 592 593 // re-enable this check with dfs 594 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 595 } 596 597 // Workaround to avoid IllegalThreadStateException 598 // See HBASE-27148 for more details 599 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 600 601 private volatile boolean stopped = false; 602 603 private final MiniDFSCluster cluster; 604 605 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 606 super("FsDatasetAsyncDiskServiceFixer"); 607 setDaemon(true); 608 this.cluster = cluster; 609 } 610 611 @Override 612 public void run() { 613 while (!stopped) { 614 try { 615 Thread.sleep(30000); 616 } catch (InterruptedException e) { 617 Thread.currentThread().interrupt(); 618 continue; 619 } 620 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 621 // timeout of the thread pool executor is 60 seconds by default. 622 try { 623 for (DataNode dn : cluster.getDataNodes()) { 624 FsDatasetSpi<?> dataset = dn.getFSDataset(); 625 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 626 service.setAccessible(true); 627 Object asyncDiskService = service.get(dataset); 628 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 629 group.setAccessible(true); 630 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 631 if (threadGroup.isDaemon()) { 632 threadGroup.setDaemon(false); 633 } 634 } 635 } catch (NoSuchFieldException e) { 636 LOG.debug("NoSuchFieldException: " + e.getMessage() 637 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 638 + "See HBASE-27595 for details."); 639 } catch (Exception e) { 640 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 641 } 642 } 643 } 644 645 void shutdown() { 646 stopped = true; 647 interrupt(); 648 } 649 } 650 651 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts) 652 throws Exception { 653 createDirsAndSetProperties(); 654 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 655 656 this.dfsCluster = 657 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 658 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 659 this.dfsClusterFixer.start(); 660 // Set this just-started cluster as our filesystem. 661 setFs(); 662 663 // Wait for the cluster to be totally up 664 this.dfsCluster.waitClusterUp(); 665 666 // reset the test directory for test file system 667 dataTestDirOnTestFS = null; 668 String dataTestDir = getDataTestDir().toString(); 669 conf.set(HConstants.HBASE_DIR, dataTestDir); 670 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 671 672 return this.dfsCluster; 673 } 674 675 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 676 createDirsAndSetProperties(); 677 dfsCluster = 678 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 679 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 680 this.dfsClusterFixer.start(); 681 return dfsCluster; 682 } 683 684 /** 685 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 686 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 687 * the conf. 688 * 689 * <pre> 690 * Configuration conf = TEST_UTIL.getConfiguration(); 691 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 692 * Map.Entry<String, String> e = i.next(); 693 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 694 * } 695 * </pre> 696 */ 697 private void createDirsAndSetProperties() throws IOException { 698 setupClusterTestDir(); 699 conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath()); 700 createDirAndSetProperty("test.cache.data"); 701 createDirAndSetProperty("hadoop.tmp.dir"); 702 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 703 createDirAndSetProperty("mapreduce.cluster.local.dir"); 704 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 705 enableShortCircuit(); 706 707 Path root = getDataTestDirOnTestFS("hadoop"); 708 conf.set(MapreduceTestingShim.getMROutputDirProp(), 709 new Path(root, "mapred-output-dir").toString()); 710 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 711 conf.set("mapreduce.jobtracker.staging.root.dir", 712 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 713 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 714 conf.set("yarn.app.mapreduce.am.staging-dir", 715 new Path(root, "mapreduce-am-staging-root-dir").toString()); 716 717 // Frustrate yarn's and hdfs's attempts at writing /tmp. 718 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 719 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 720 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 721 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 722 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 723 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 724 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 725 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 726 createDirAndSetProperty("dfs.journalnode.edits.dir"); 727 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 728 createDirAndSetProperty("nfs.dump.dir"); 729 createDirAndSetProperty("java.io.tmpdir"); 730 createDirAndSetProperty("dfs.journalnode.edits.dir"); 731 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 732 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 733 734 // disable metrics logger since it depend on commons-logging internal classes and we do not want 735 // commons-logging on our classpath 736 conf.setInt(DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0); 737 conf.setInt(DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0); 738 } 739 740 /** 741 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 742 * Default to false. 743 */ 744 public boolean isNewVersionBehaviorEnabled() { 745 final String propName = "hbase.tests.new.version.behavior"; 746 String v = System.getProperty(propName); 747 if (v != null) { 748 return Boolean.parseBoolean(v); 749 } 750 return false; 751 } 752 753 /** 754 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 755 * allows to specify this parameter on the command line. If not set, default is true. 756 */ 757 public boolean isReadShortCircuitOn() { 758 final String propName = "hbase.tests.use.shortcircuit.reads"; 759 String readOnProp = System.getProperty(propName); 760 if (readOnProp != null) { 761 return Boolean.parseBoolean(readOnProp); 762 } else { 763 return conf.getBoolean(propName, false); 764 } 765 } 766 767 /** 768 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 769 * including skipping the hdfs checksum checks. 770 */ 771 private void enableShortCircuit() { 772 if (isReadShortCircuitOn()) { 773 String curUser = System.getProperty("user.name"); 774 LOG.info("read short circuit is ON for user " + curUser); 775 // read short circuit, for hdfs 776 conf.set("dfs.block.local-path-access.user", curUser); 777 // read short circuit, for hbase 778 conf.setBoolean("dfs.client.read.shortcircuit", true); 779 // Skip checking checksum, for the hdfs client and the datanode 780 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 781 } else { 782 LOG.info("read short circuit is OFF"); 783 } 784 } 785 786 private String createDirAndSetProperty(final String property) { 787 return createDirAndSetProperty(property, property); 788 } 789 790 private String createDirAndSetProperty(final String relPath, String property) { 791 String path = getDataTestDir(relPath).toString(); 792 System.setProperty(property, path); 793 conf.set(property, path); 794 new File(path).mkdirs(); 795 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 796 return path; 797 } 798 799 /** 800 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 801 */ 802 public void shutdownMiniDFSCluster() throws IOException { 803 if (this.dfsCluster != null) { 804 // The below throws an exception per dn, AsynchronousCloseException. 805 this.dfsCluster.shutdown(); 806 dfsCluster = null; 807 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 808 // have a fixer 809 if (dfsClusterFixer != null) { 810 this.dfsClusterFixer.shutdown(); 811 dfsClusterFixer = null; 812 } 813 dataTestDirOnTestFS = null; 814 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 815 } 816 } 817 818 /** 819 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 820 * other options will use default values, defined in {@link StartTestingClusterOption.Builder}. 821 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 822 * @see #startMiniCluster(StartTestingClusterOption option) 823 * @see #shutdownMiniDFSCluster() 824 */ 825 public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception { 826 StartTestingClusterOption option = StartTestingClusterOption.builder() 827 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 828 return startMiniCluster(option); 829 } 830 831 /** 832 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 833 * value can be found in {@link StartTestingClusterOption.Builder}. 834 * @see #startMiniCluster(StartTestingClusterOption option) 835 * @see #shutdownMiniDFSCluster() 836 */ 837 public SingleProcessHBaseCluster startMiniCluster() throws Exception { 838 return startMiniCluster(StartTestingClusterOption.builder().build()); 839 } 840 841 /** 842 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 843 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 844 * under System property test.build.data, to be cleaned up on exit. 845 * @see #shutdownMiniDFSCluster() 846 */ 847 public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option) 848 throws Exception { 849 LOG.info("Starting up minicluster with option: {}", option); 850 851 // If we already put up a cluster, fail. 852 if (miniClusterRunning) { 853 throw new IllegalStateException("A mini-cluster is already running"); 854 } 855 miniClusterRunning = true; 856 857 setupClusterTestDir(); 858 859 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 860 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 861 if (dfsCluster == null) { 862 LOG.info("STARTING DFS"); 863 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 864 } else { 865 LOG.info("NOT STARTING DFS"); 866 } 867 868 // Start up a zk cluster. 869 if (getZkCluster() == null) { 870 startMiniZKCluster(option.getNumZkServers()); 871 } 872 873 // Start the MiniHBaseCluster 874 return startMiniHBaseCluster(option); 875 } 876 877 /** 878 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 879 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 880 * @return Reference to the hbase mini hbase cluster. 881 * @see #startMiniCluster(StartTestingClusterOption) 882 * @see #shutdownMiniHBaseCluster() 883 */ 884 public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option) 885 throws IOException, InterruptedException { 886 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 887 createRootDir(option.isCreateRootDir()); 888 if (option.isCreateWALDir()) { 889 createWALRootDir(); 890 } 891 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 892 // for tests that do not read hbase-defaults.xml 893 setHBaseFsTmpDir(); 894 895 // These settings will make the server waits until this exact number of 896 // regions servers are connected. 897 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 898 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 899 } 900 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 901 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 902 } 903 904 Configuration c = new Configuration(this.conf); 905 this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(), 906 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 907 option.getMasterClass(), option.getRsClass()); 908 // Populate the master address configuration from mini cluster configuration. 909 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 910 // Don't leave here till we've done a successful scan of the hbase:meta 911 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 912 ResultScanner s = t.getScanner(new Scan())) { 913 for (;;) { 914 if (s.next() == null) { 915 break; 916 } 917 } 918 } 919 920 getAdmin(); // create immediately the hbaseAdmin 921 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 922 923 return (SingleProcessHBaseCluster) hbaseCluster; 924 } 925 926 /** 927 * Starts up mini hbase cluster using default options. Default options can be found in 928 * {@link StartTestingClusterOption.Builder}. 929 * @see #startMiniHBaseCluster(StartTestingClusterOption) 930 * @see #shutdownMiniHBaseCluster() 931 */ 932 public SingleProcessHBaseCluster startMiniHBaseCluster() 933 throws IOException, InterruptedException { 934 return startMiniHBaseCluster(StartTestingClusterOption.builder().build()); 935 } 936 937 /** 938 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 939 * {@link #startMiniCluster()}. All other options will use default values, defined in 940 * {@link StartTestingClusterOption.Builder}. 941 * @param numMasters Master node number. 942 * @param numRegionServers Number of region servers. 943 * @return The mini HBase cluster created. 944 * @see #shutdownMiniHBaseCluster() 945 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 946 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 947 * @see #startMiniHBaseCluster(StartTestingClusterOption) 948 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 949 */ 950 @Deprecated 951 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 952 throws IOException, InterruptedException { 953 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 954 .numRegionServers(numRegionServers).build(); 955 return startMiniHBaseCluster(option); 956 } 957 958 /** 959 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 960 * {@link #startMiniCluster()}. All other options will use default values, defined in 961 * {@link StartTestingClusterOption.Builder}. 962 * @param numMasters Master node number. 963 * @param numRegionServers Number of region servers. 964 * @param rsPorts Ports that RegionServer should use. 965 * @return The mini HBase cluster created. 966 * @see #shutdownMiniHBaseCluster() 967 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 968 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 969 * @see #startMiniHBaseCluster(StartTestingClusterOption) 970 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 971 */ 972 @Deprecated 973 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 974 List<Integer> rsPorts) throws IOException, InterruptedException { 975 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 976 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 977 return startMiniHBaseCluster(option); 978 } 979 980 /** 981 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 982 * {@link #startMiniCluster()}. All other options will use default values, defined in 983 * {@link StartTestingClusterOption.Builder}. 984 * @param numMasters Master node number. 985 * @param numRegionServers Number of region servers. 986 * @param rsPorts Ports that RegionServer should use. 987 * @param masterClass The class to use as HMaster, or null for default. 988 * @param rsClass The class to use as HRegionServer, or null for default. 989 * @param createRootDir Whether to create a new root or data directory path. 990 * @param createWALDir Whether to create a new WAL directory. 991 * @return The mini HBase cluster created. 992 * @see #shutdownMiniHBaseCluster() 993 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 994 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 995 * @see #startMiniHBaseCluster(StartTestingClusterOption) 996 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 997 */ 998 @Deprecated 999 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 1000 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 1001 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass, 1002 boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException { 1003 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 1004 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 1005 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 1006 return startMiniHBaseCluster(option); 1007 } 1008 1009 /** 1010 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 1011 * want to keep dfs/zk up and just stop/start hbase. 1012 * @param servers number of region servers 1013 */ 1014 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 1015 this.restartHBaseCluster(servers, null); 1016 } 1017 1018 public void restartHBaseCluster(int servers, List<Integer> ports) 1019 throws IOException, InterruptedException { 1020 StartTestingClusterOption option = 1021 StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 1022 restartHBaseCluster(option); 1023 invalidateConnection(); 1024 } 1025 1026 public void restartHBaseCluster(StartTestingClusterOption option) 1027 throws IOException, InterruptedException { 1028 closeConnection(); 1029 this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(), 1030 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1031 option.getMasterClass(), option.getRsClass()); 1032 // Don't leave here till we've done a successful scan of the hbase:meta 1033 Connection conn = ConnectionFactory.createConnection(this.conf); 1034 Table t = conn.getTable(TableName.META_TABLE_NAME); 1035 ResultScanner s = t.getScanner(new Scan()); 1036 while (s.next() != null) { 1037 // do nothing 1038 } 1039 LOG.info("HBase has been restarted"); 1040 s.close(); 1041 t.close(); 1042 conn.close(); 1043 } 1044 1045 /** 1046 * Returns current mini hbase cluster. Only has something in it after a call to 1047 * {@link #startMiniCluster()}. 1048 * @see #startMiniCluster() 1049 */ 1050 public SingleProcessHBaseCluster getMiniHBaseCluster() { 1051 if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) { 1052 return (SingleProcessHBaseCluster) this.hbaseCluster; 1053 } 1054 throw new RuntimeException( 1055 hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName()); 1056 } 1057 1058 /** 1059 * Stops mini hbase, zk, and hdfs clusters. 1060 * @see #startMiniCluster(int) 1061 */ 1062 public void shutdownMiniCluster() throws IOException { 1063 LOG.info("Shutting down minicluster"); 1064 shutdownMiniHBaseCluster(); 1065 shutdownMiniDFSCluster(); 1066 shutdownMiniZKCluster(); 1067 1068 cleanupTestDir(); 1069 miniClusterRunning = false; 1070 LOG.info("Minicluster is down"); 1071 } 1072 1073 /** 1074 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1075 * @throws java.io.IOException in case command is unsuccessful 1076 */ 1077 public void shutdownMiniHBaseCluster() throws IOException { 1078 cleanup(); 1079 if (this.hbaseCluster != null) { 1080 this.hbaseCluster.shutdown(); 1081 // Wait till hbase is down before going on to shutdown zk. 1082 this.hbaseCluster.waitUntilShutDown(); 1083 this.hbaseCluster = null; 1084 } 1085 if (zooKeeperWatcher != null) { 1086 zooKeeperWatcher.close(); 1087 zooKeeperWatcher = null; 1088 } 1089 } 1090 1091 /** 1092 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1093 * @throws java.io.IOException throws in case command is unsuccessful 1094 */ 1095 public void killMiniHBaseCluster() throws IOException { 1096 cleanup(); 1097 if (this.hbaseCluster != null) { 1098 getMiniHBaseCluster().killAll(); 1099 this.hbaseCluster = null; 1100 } 1101 if (zooKeeperWatcher != null) { 1102 zooKeeperWatcher.close(); 1103 zooKeeperWatcher = null; 1104 } 1105 } 1106 1107 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1108 private void cleanup() throws IOException { 1109 closeConnection(); 1110 // unset the configuration for MIN and MAX RS to start 1111 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1112 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1113 } 1114 1115 /** 1116 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1117 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1118 * If false, previous path is used. Note: this does not cause the root dir to be created. 1119 * @return Fully qualified path for the default hbase root dir 1120 */ 1121 public Path getDefaultRootDirPath(boolean create) throws IOException { 1122 if (!create) { 1123 return getDataTestDirOnTestFS(); 1124 } else { 1125 return getNewDataTestDirOnTestFS(); 1126 } 1127 } 1128 1129 /** 1130 * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that 1131 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1132 * @return Fully qualified path for the default hbase root dir 1133 */ 1134 public Path getDefaultRootDirPath() throws IOException { 1135 return getDefaultRootDirPath(false); 1136 } 1137 1138 /** 1139 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1140 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1141 * startup. You'd only use this method if you were doing manual operation. 1142 * @param create This flag decides whether to get a new root or data directory path or not, if it 1143 * has been fetched already. Note : Directory will be made irrespective of whether 1144 * path has been fetched or not. If directory already exists, it will be overwritten 1145 * @return Fully qualified path to hbase root dir 1146 */ 1147 public Path createRootDir(boolean create) throws IOException { 1148 FileSystem fs = FileSystem.get(this.conf); 1149 Path hbaseRootdir = getDefaultRootDirPath(create); 1150 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1151 fs.mkdirs(hbaseRootdir); 1152 FSUtils.setVersion(fs, hbaseRootdir); 1153 return hbaseRootdir; 1154 } 1155 1156 /** 1157 * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code> 1158 * flag is false. 1159 * @return Fully qualified path to hbase root dir 1160 */ 1161 public Path createRootDir() throws IOException { 1162 return createRootDir(false); 1163 } 1164 1165 /** 1166 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1167 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1168 * this method if you were doing manual operation. 1169 * @return Fully qualified path to hbase root dir 1170 */ 1171 public Path createWALRootDir() throws IOException { 1172 FileSystem fs = FileSystem.get(this.conf); 1173 Path walDir = getNewDataTestDirOnTestFS(); 1174 CommonFSUtils.setWALRootDir(this.conf, walDir); 1175 fs.mkdirs(walDir); 1176 return walDir; 1177 } 1178 1179 private void setHBaseFsTmpDir() throws IOException { 1180 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1181 if (hbaseFsTmpDirInString == null) { 1182 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1183 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1184 } else { 1185 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1186 } 1187 } 1188 1189 /** 1190 * Flushes all caches in the mini hbase cluster 1191 */ 1192 public void flush() throws IOException { 1193 getMiniHBaseCluster().flushcache(); 1194 } 1195 1196 /** 1197 * Flushes all caches in the mini hbase cluster 1198 */ 1199 public void flush(TableName tableName) throws IOException { 1200 getMiniHBaseCluster().flushcache(tableName); 1201 } 1202 1203 /** 1204 * Compact all regions in the mini hbase cluster 1205 */ 1206 public void compact(boolean major) throws IOException { 1207 getMiniHBaseCluster().compact(major); 1208 } 1209 1210 /** 1211 * Compact all of a table's reagion in the mini hbase cluster 1212 */ 1213 public void compact(TableName tableName, boolean major) throws IOException { 1214 getMiniHBaseCluster().compact(tableName, major); 1215 } 1216 1217 /** 1218 * Create a table. 1219 * @return A Table instance for the created table. 1220 */ 1221 public Table createTable(TableName tableName, String family) throws IOException { 1222 return createTable(tableName, new String[] { family }); 1223 } 1224 1225 /** 1226 * Create a table. 1227 * @return A Table instance for the created table. 1228 */ 1229 public Table createTable(TableName tableName, String[] families) throws IOException { 1230 List<byte[]> fams = new ArrayList<>(families.length); 1231 for (String family : families) { 1232 fams.add(Bytes.toBytes(family)); 1233 } 1234 return createTable(tableName, fams.toArray(new byte[0][])); 1235 } 1236 1237 /** 1238 * Create a table. 1239 * @return A Table instance for the created table. 1240 */ 1241 public Table createTable(TableName tableName, byte[] family) throws IOException { 1242 return createTable(tableName, new byte[][] { family }); 1243 } 1244 1245 /** 1246 * Create a table with multiple regions. 1247 * @return A Table instance for the created table. 1248 */ 1249 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1250 throws IOException { 1251 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1252 byte[] startKey = Bytes.toBytes("aaaaa"); 1253 byte[] endKey = Bytes.toBytes("zzzzz"); 1254 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1255 1256 return createTable(tableName, new byte[][] { family }, splitKeys); 1257 } 1258 1259 /** 1260 * Create a table. 1261 * @return A Table instance for the created table. 1262 */ 1263 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1264 return createTable(tableName, families, (byte[][]) null); 1265 } 1266 1267 /** 1268 * Create a table with multiple regions. 1269 * @return A Table instance for the created table. 1270 */ 1271 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1272 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1273 } 1274 1275 /** 1276 * Create a table with multiple regions. 1277 * @param replicaCount replica count. 1278 * @return A Table instance for the created table. 1279 */ 1280 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1281 throws IOException { 1282 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1283 } 1284 1285 /** 1286 * Create a table. 1287 * @return A Table instance for the created table. 1288 */ 1289 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1290 throws IOException { 1291 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1292 } 1293 1294 /** 1295 * Create a table. 1296 * @param tableName the table name 1297 * @param families the families 1298 * @param splitKeys the splitkeys 1299 * @param replicaCount the region replica count 1300 * @return A Table instance for the created table. 1301 * @throws IOException throws IOException 1302 */ 1303 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1304 int replicaCount) throws IOException { 1305 return createTable(tableName, families, splitKeys, replicaCount, 1306 new Configuration(getConfiguration())); 1307 } 1308 1309 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1310 byte[] endKey, int numRegions) throws IOException { 1311 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1312 1313 getAdmin().createTable(desc, startKey, endKey, numRegions); 1314 // HBaseAdmin only waits for regions to appear in hbase:meta we 1315 // should wait until they are assigned 1316 waitUntilAllRegionsAssigned(tableName); 1317 return getConnection().getTable(tableName); 1318 } 1319 1320 /** 1321 * Create a table. 1322 * @param c Configuration to use 1323 * @return A Table instance for the created table. 1324 */ 1325 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1326 throws IOException { 1327 return createTable(htd, families, null, c); 1328 } 1329 1330 /** 1331 * Create a table. 1332 * @param htd table descriptor 1333 * @param families array of column families 1334 * @param splitKeys array of split keys 1335 * @param c Configuration to use 1336 * @return A Table instance for the created table. 1337 * @throws IOException if getAdmin or createTable fails 1338 */ 1339 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1340 Configuration c) throws IOException { 1341 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1342 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1343 // on is interfering. 1344 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1345 } 1346 1347 /** 1348 * Create a table. 1349 * @param htd table descriptor 1350 * @param families array of column families 1351 * @param splitKeys array of split keys 1352 * @param type Bloom type 1353 * @param blockSize block size 1354 * @param c Configuration to use 1355 * @return A Table instance for the created table. 1356 * @throws IOException if getAdmin or createTable fails 1357 */ 1358 1359 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1360 BloomType type, int blockSize, Configuration c) throws IOException { 1361 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1362 for (byte[] family : families) { 1363 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1364 .setBloomFilterType(type).setBlocksize(blockSize); 1365 if (isNewVersionBehaviorEnabled()) { 1366 cfdb.setNewVersionBehavior(true); 1367 } 1368 builder.setColumnFamily(cfdb.build()); 1369 } 1370 TableDescriptor td = builder.build(); 1371 if (splitKeys != null) { 1372 getAdmin().createTable(td, splitKeys); 1373 } else { 1374 getAdmin().createTable(td); 1375 } 1376 // HBaseAdmin only waits for regions to appear in hbase:meta 1377 // we should wait until they are assigned 1378 waitUntilAllRegionsAssigned(td.getTableName()); 1379 return getConnection().getTable(td.getTableName()); 1380 } 1381 1382 /** 1383 * Create a table. 1384 * @param htd table descriptor 1385 * @param splitRows array of split keys 1386 * @return A Table instance for the created table. 1387 */ 1388 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1389 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1390 if (isNewVersionBehaviorEnabled()) { 1391 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1392 builder.setColumnFamily( 1393 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1394 } 1395 } 1396 if (splitRows != null) { 1397 getAdmin().createTable(builder.build(), splitRows); 1398 } else { 1399 getAdmin().createTable(builder.build()); 1400 } 1401 // HBaseAdmin only waits for regions to appear in hbase:meta 1402 // we should wait until they are assigned 1403 waitUntilAllRegionsAssigned(htd.getTableName()); 1404 return getConnection().getTable(htd.getTableName()); 1405 } 1406 1407 /** 1408 * Create a table. 1409 * @param tableName the table name 1410 * @param families the families 1411 * @param splitKeys the split keys 1412 * @param replicaCount the replica count 1413 * @param c Configuration to use 1414 * @return A Table instance for the created table. 1415 */ 1416 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1417 int replicaCount, final Configuration c) throws IOException { 1418 TableDescriptor htd = 1419 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1420 return createTable(htd, families, splitKeys, c); 1421 } 1422 1423 /** 1424 * Create a table. 1425 * @return A Table instance for the created table. 1426 */ 1427 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1428 return createTable(tableName, new byte[][] { family }, numVersions); 1429 } 1430 1431 /** 1432 * Create a table. 1433 * @return A Table instance for the created table. 1434 */ 1435 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1436 throws IOException { 1437 return createTable(tableName, families, numVersions, (byte[][]) null); 1438 } 1439 1440 /** 1441 * Create a table. 1442 * @return A Table instance for the created table. 1443 */ 1444 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1445 byte[][] splitKeys) throws IOException { 1446 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1447 for (byte[] family : families) { 1448 ColumnFamilyDescriptorBuilder cfBuilder = 1449 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1450 if (isNewVersionBehaviorEnabled()) { 1451 cfBuilder.setNewVersionBehavior(true); 1452 } 1453 builder.setColumnFamily(cfBuilder.build()); 1454 } 1455 if (splitKeys != null) { 1456 getAdmin().createTable(builder.build(), splitKeys); 1457 } else { 1458 getAdmin().createTable(builder.build()); 1459 } 1460 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1461 // assigned 1462 waitUntilAllRegionsAssigned(tableName); 1463 return getConnection().getTable(tableName); 1464 } 1465 1466 /** 1467 * Create a table with multiple regions. 1468 * @return A Table instance for the created table. 1469 */ 1470 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1471 throws IOException { 1472 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1473 } 1474 1475 /** 1476 * Create a table. 1477 * @return A Table instance for the created table. 1478 */ 1479 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1480 throws IOException { 1481 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1482 for (byte[] family : families) { 1483 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1484 .setMaxVersions(numVersions).setBlocksize(blockSize); 1485 if (isNewVersionBehaviorEnabled()) { 1486 cfBuilder.setNewVersionBehavior(true); 1487 } 1488 builder.setColumnFamily(cfBuilder.build()); 1489 } 1490 getAdmin().createTable(builder.build()); 1491 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1492 // assigned 1493 waitUntilAllRegionsAssigned(tableName); 1494 return getConnection().getTable(tableName); 1495 } 1496 1497 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1498 String cpName) throws IOException { 1499 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1500 for (byte[] family : families) { 1501 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1502 .setMaxVersions(numVersions).setBlocksize(blockSize); 1503 if (isNewVersionBehaviorEnabled()) { 1504 cfBuilder.setNewVersionBehavior(true); 1505 } 1506 builder.setColumnFamily(cfBuilder.build()); 1507 } 1508 if (cpName != null) { 1509 builder.setCoprocessor(cpName); 1510 } 1511 getAdmin().createTable(builder.build()); 1512 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1513 // assigned 1514 waitUntilAllRegionsAssigned(tableName); 1515 return getConnection().getTable(tableName); 1516 } 1517 1518 /** 1519 * Create a table. 1520 * @return A Table instance for the created table. 1521 */ 1522 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1523 throws IOException { 1524 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1525 int i = 0; 1526 for (byte[] family : families) { 1527 ColumnFamilyDescriptorBuilder cfBuilder = 1528 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1529 if (isNewVersionBehaviorEnabled()) { 1530 cfBuilder.setNewVersionBehavior(true); 1531 } 1532 builder.setColumnFamily(cfBuilder.build()); 1533 i++; 1534 } 1535 getAdmin().createTable(builder.build()); 1536 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1537 // assigned 1538 waitUntilAllRegionsAssigned(tableName); 1539 return getConnection().getTable(tableName); 1540 } 1541 1542 /** 1543 * Create a table. 1544 * @return A Table instance for the created table. 1545 */ 1546 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1547 throws IOException { 1548 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1549 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1550 if (isNewVersionBehaviorEnabled()) { 1551 cfBuilder.setNewVersionBehavior(true); 1552 } 1553 builder.setColumnFamily(cfBuilder.build()); 1554 getAdmin().createTable(builder.build(), splitRows); 1555 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1556 // assigned 1557 waitUntilAllRegionsAssigned(tableName); 1558 return getConnection().getTable(tableName); 1559 } 1560 1561 /** 1562 * Create a table with multiple regions. 1563 * @return A Table instance for the created table. 1564 */ 1565 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1566 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1567 } 1568 1569 /** 1570 * Set the number of Region replicas. 1571 */ 1572 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1573 throws IOException, InterruptedException { 1574 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1575 .setRegionReplication(replicaCount).build(); 1576 admin.modifyTable(desc); 1577 } 1578 1579 /** 1580 * Set the number of Region replicas. 1581 */ 1582 public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount) 1583 throws ExecutionException, IOException, InterruptedException { 1584 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get()) 1585 .setRegionReplication(replicaCount).build(); 1586 admin.modifyTable(desc).get(); 1587 } 1588 1589 /** 1590 * Drop an existing table 1591 * @param tableName existing table 1592 */ 1593 public void deleteTable(TableName tableName) throws IOException { 1594 try { 1595 getAdmin().disableTable(tableName); 1596 } catch (TableNotEnabledException e) { 1597 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1598 } 1599 getAdmin().deleteTable(tableName); 1600 } 1601 1602 /** 1603 * Drop an existing table 1604 * @param tableName existing table 1605 */ 1606 public void deleteTableIfAny(TableName tableName) throws IOException { 1607 try { 1608 deleteTable(tableName); 1609 } catch (TableNotFoundException e) { 1610 // ignore 1611 } 1612 } 1613 1614 // ========================================================================== 1615 // Canned table and table descriptor creation 1616 1617 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1618 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1619 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1620 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1621 private static final int MAXVERSIONS = 3; 1622 1623 public static final char FIRST_CHAR = 'a'; 1624 public static final char LAST_CHAR = 'z'; 1625 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1626 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1627 1628 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1629 return createModifyableTableDescriptor(TableName.valueOf(name), 1630 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1631 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1632 } 1633 1634 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1635 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1636 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1637 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1638 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1639 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1640 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1641 if (isNewVersionBehaviorEnabled()) { 1642 cfBuilder.setNewVersionBehavior(true); 1643 } 1644 builder.setColumnFamily(cfBuilder.build()); 1645 } 1646 return builder.build(); 1647 } 1648 1649 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1650 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1651 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1652 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1653 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1654 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1655 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1656 if (isNewVersionBehaviorEnabled()) { 1657 cfBuilder.setNewVersionBehavior(true); 1658 } 1659 builder.setColumnFamily(cfBuilder.build()); 1660 } 1661 return builder; 1662 } 1663 1664 /** 1665 * Create a table of name <code>name</code>. 1666 * @param name Name to give table. 1667 * @return Column descriptor. 1668 */ 1669 public TableDescriptor createTableDescriptor(final TableName name) { 1670 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1671 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1672 } 1673 1674 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1675 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1676 } 1677 1678 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1679 int maxVersions) { 1680 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1681 for (byte[] family : families) { 1682 ColumnFamilyDescriptorBuilder cfBuilder = 1683 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1684 if (isNewVersionBehaviorEnabled()) { 1685 cfBuilder.setNewVersionBehavior(true); 1686 } 1687 builder.setColumnFamily(cfBuilder.build()); 1688 } 1689 return builder.build(); 1690 } 1691 1692 /** 1693 * Create an HRegion that writes to the local tmp dirs 1694 * @param desc a table descriptor indicating which table the region belongs to 1695 * @param startKey the start boundary of the region 1696 * @param endKey the end boundary of the region 1697 * @return a region that writes to local dir for testing 1698 */ 1699 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1700 throws IOException { 1701 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1702 .setEndKey(endKey).build(); 1703 return createLocalHRegion(hri, desc); 1704 } 1705 1706 /** 1707 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1708 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it. 1709 */ 1710 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1711 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1712 } 1713 1714 /** 1715 * Create an HRegion that writes to the local tmp dirs with specified wal 1716 * @param info regioninfo 1717 * @param conf configuration 1718 * @param desc table descriptor 1719 * @param wal wal for this region. 1720 * @return created hregion 1721 */ 1722 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1723 WAL wal) throws IOException { 1724 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 1725 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1726 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1727 } 1728 1729 /** 1730 * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} 1731 * when done. 1732 */ 1733 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1734 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1735 throws IOException { 1736 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1737 durability, wal, null, families); 1738 } 1739 1740 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1741 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1742 boolean[] compactedMemStore, byte[]... families) throws IOException { 1743 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1744 builder.setReadOnly(isReadOnly); 1745 int i = 0; 1746 for (byte[] family : families) { 1747 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1748 if (compactedMemStore != null && i < compactedMemStore.length) { 1749 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1750 } else { 1751 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1752 1753 } 1754 i++; 1755 // Set default to be three versions. 1756 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1757 builder.setColumnFamily(cfBuilder.build()); 1758 } 1759 builder.setDurability(durability); 1760 RegionInfo info = 1761 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1762 return createLocalHRegion(info, conf, builder.build(), wal); 1763 } 1764 1765 // 1766 // ========================================================================== 1767 1768 /** 1769 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1770 * read. 1771 * @param tableName existing table 1772 * @return HTable to that new table 1773 */ 1774 public Table deleteTableData(TableName tableName) throws IOException { 1775 Table table = getConnection().getTable(tableName); 1776 Scan scan = new Scan(); 1777 ResultScanner resScan = table.getScanner(scan); 1778 for (Result res : resScan) { 1779 Delete del = new Delete(res.getRow()); 1780 table.delete(del); 1781 } 1782 resScan = table.getScanner(scan); 1783 resScan.close(); 1784 return table; 1785 } 1786 1787 /** 1788 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1789 * table. 1790 * @param tableName table which must exist. 1791 * @param preserveRegions keep the existing split points 1792 * @return HTable for the new table 1793 */ 1794 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 1795 throws IOException { 1796 Admin admin = getAdmin(); 1797 if (!admin.isTableDisabled(tableName)) { 1798 admin.disableTable(tableName); 1799 } 1800 admin.truncateTable(tableName, preserveRegions); 1801 return getConnection().getTable(tableName); 1802 } 1803 1804 /** 1805 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1806 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 1807 * preserve regions of existing table. 1808 * @param tableName table which must exist. 1809 * @return HTable for the new table 1810 */ 1811 public Table truncateTable(final TableName tableName) throws IOException { 1812 return truncateTable(tableName, false); 1813 } 1814 1815 /** 1816 * Load table with rows from 'aaa' to 'zzz'. 1817 * @param t Table 1818 * @param f Family 1819 * @return Count of rows loaded. 1820 */ 1821 public int loadTable(final Table t, final byte[] f) throws IOException { 1822 return loadTable(t, new byte[][] { f }); 1823 } 1824 1825 /** 1826 * Load table with rows from 'aaa' to 'zzz'. 1827 * @param t Table 1828 * @param f Family 1829 * @return Count of rows loaded. 1830 */ 1831 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 1832 return loadTable(t, new byte[][] { f }, null, writeToWAL); 1833 } 1834 1835 /** 1836 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1837 * @param t Table 1838 * @param f Array of Families to load 1839 * @return Count of rows loaded. 1840 */ 1841 public int loadTable(final Table t, final byte[][] f) throws IOException { 1842 return loadTable(t, f, null); 1843 } 1844 1845 /** 1846 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1847 * @param t Table 1848 * @param f Array of Families to load 1849 * @param value the values of the cells. If null is passed, the row key is used as value 1850 * @return Count of rows loaded. 1851 */ 1852 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 1853 return loadTable(t, f, value, true); 1854 } 1855 1856 /** 1857 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1858 * @param t Table 1859 * @param f Array of Families to load 1860 * @param value the values of the cells. If null is passed, the row key is used as value 1861 * @return Count of rows loaded. 1862 */ 1863 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 1864 throws IOException { 1865 List<Put> puts = new ArrayList<>(); 1866 for (byte[] row : HBaseTestingUtil.ROWS) { 1867 Put put = new Put(row); 1868 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 1869 for (int i = 0; i < f.length; i++) { 1870 byte[] value1 = value != null ? value : row; 1871 put.addColumn(f[i], f[i], value1); 1872 } 1873 puts.add(put); 1874 } 1875 t.put(puts); 1876 return puts.size(); 1877 } 1878 1879 /** 1880 * A tracker for tracking and validating table rows generated with 1881 * {@link HBaseTestingUtil#loadTable(Table, byte[])} 1882 */ 1883 public static class SeenRowTracker { 1884 int dim = 'z' - 'a' + 1; 1885 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 1886 byte[] startRow; 1887 byte[] stopRow; 1888 1889 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 1890 this.startRow = startRow; 1891 this.stopRow = stopRow; 1892 } 1893 1894 void reset() { 1895 for (byte[] row : ROWS) { 1896 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 1897 } 1898 } 1899 1900 int i(byte b) { 1901 return b - 'a'; 1902 } 1903 1904 public void addRow(byte[] row) { 1905 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 1906 } 1907 1908 /** 1909 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 1910 * rows none 1911 */ 1912 public void validate() { 1913 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1914 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1915 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1916 int count = seenRows[i(b1)][i(b2)][i(b3)]; 1917 int expectedCount = 0; 1918 if ( 1919 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 1920 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 1921 ) { 1922 expectedCount = 1; 1923 } 1924 if (count != expectedCount) { 1925 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 1926 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 1927 + "instead of " + expectedCount); 1928 } 1929 } 1930 } 1931 } 1932 } 1933 } 1934 1935 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 1936 return loadRegion(r, f, false); 1937 } 1938 1939 public int loadRegion(final Region r, final byte[] f) throws IOException { 1940 return loadRegion((HRegion) r, f); 1941 } 1942 1943 /** 1944 * Load region with rows from 'aaa' to 'zzz'. 1945 * @param r Region 1946 * @param f Family 1947 * @param flush flush the cache if true 1948 * @return Count of rows loaded. 1949 */ 1950 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 1951 byte[] k = new byte[3]; 1952 int rowCount = 0; 1953 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1954 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1955 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1956 k[0] = b1; 1957 k[1] = b2; 1958 k[2] = b3; 1959 Put put = new Put(k); 1960 put.setDurability(Durability.SKIP_WAL); 1961 put.addColumn(f, null, k); 1962 if (r.getWAL() == null) { 1963 put.setDurability(Durability.SKIP_WAL); 1964 } 1965 int preRowCount = rowCount; 1966 int pause = 10; 1967 int maxPause = 1000; 1968 while (rowCount == preRowCount) { 1969 try { 1970 r.put(put); 1971 rowCount++; 1972 } catch (RegionTooBusyException e) { 1973 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 1974 Threads.sleep(pause); 1975 } 1976 } 1977 } 1978 } 1979 if (flush) { 1980 r.flush(true); 1981 } 1982 } 1983 return rowCount; 1984 } 1985 1986 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 1987 throws IOException { 1988 for (int i = startRow; i < endRow; i++) { 1989 byte[] data = Bytes.toBytes(String.valueOf(i)); 1990 Put put = new Put(data); 1991 put.addColumn(f, null, data); 1992 t.put(put); 1993 } 1994 } 1995 1996 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 1997 throws IOException { 1998 for (int i = 0; i < totalRows; i++) { 1999 byte[] row = new byte[rowSize]; 2000 Bytes.random(row); 2001 Put put = new Put(row); 2002 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 2003 t.put(put); 2004 } 2005 } 2006 2007 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 2008 int replicaId) throws IOException { 2009 for (int i = startRow; i < endRow; i++) { 2010 String failMsg = "Failed verification of row :" + i; 2011 byte[] data = Bytes.toBytes(String.valueOf(i)); 2012 Get get = new Get(data); 2013 get.setReplicaId(replicaId); 2014 get.setConsistency(Consistency.TIMELINE); 2015 Result result = table.get(get); 2016 assertTrue(result.containsColumn(f, null), failMsg); 2017 assertEquals(1, result.getColumnCells(f, null).size(), failMsg); 2018 Cell cell = result.getColumnLatestCell(f, null); 2019 assertTrue(Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), 2020 cell.getValueLength()), failMsg); 2021 } 2022 } 2023 2024 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 2025 throws IOException { 2026 verifyNumericRows((HRegion) region, f, startRow, endRow); 2027 } 2028 2029 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 2030 throws IOException { 2031 verifyNumericRows(region, f, startRow, endRow, true); 2032 } 2033 2034 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 2035 final boolean present) throws IOException { 2036 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 2037 } 2038 2039 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 2040 final boolean present) throws IOException { 2041 for (int i = startRow; i < endRow; i++) { 2042 String failMsg = "Failed verification of row :" + i; 2043 byte[] data = Bytes.toBytes(String.valueOf(i)); 2044 Result result = region.get(new Get(data)); 2045 2046 boolean hasResult = result != null && !result.isEmpty(); 2047 assertEquals(present, hasResult, failMsg + result); 2048 if (!present) continue; 2049 2050 assertTrue(result.containsColumn(f, null), failMsg); 2051 assertEquals(1, result.getColumnCells(f, null).size(), failMsg); 2052 Cell cell = result.getColumnLatestCell(f, null); 2053 assertTrue(Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), 2054 cell.getValueLength()), failMsg); 2055 } 2056 } 2057 2058 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2059 throws IOException { 2060 for (int i = startRow; i < endRow; i++) { 2061 byte[] data = Bytes.toBytes(String.valueOf(i)); 2062 Delete delete = new Delete(data); 2063 delete.addFamily(f); 2064 t.delete(delete); 2065 } 2066 } 2067 2068 /** 2069 * Return the number of rows in the given table. 2070 * @param table to count rows 2071 * @return count of rows 2072 */ 2073 public static int countRows(final Table table) throws IOException { 2074 return countRows(table, new Scan()); 2075 } 2076 2077 public static int countRows(final Table table, final Scan scan) throws IOException { 2078 try (ResultScanner results = table.getScanner(scan)) { 2079 int count = 0; 2080 while (results.next() != null) { 2081 count++; 2082 } 2083 return count; 2084 } 2085 } 2086 2087 public static int countRows(final Table table, final byte[]... families) throws IOException { 2088 Scan scan = new Scan(); 2089 for (byte[] family : families) { 2090 scan.addFamily(family); 2091 } 2092 return countRows(table, scan); 2093 } 2094 2095 /** 2096 * Return the number of rows in the given table. 2097 */ 2098 public int countRows(final TableName tableName) throws IOException { 2099 try (Table table = getConnection().getTable(tableName)) { 2100 return countRows(table); 2101 } 2102 } 2103 2104 public static int countRows(final Region region) throws IOException { 2105 return countRows(region, new Scan()); 2106 } 2107 2108 public static int countRows(final Region region, final Scan scan) throws IOException { 2109 try (InternalScanner scanner = region.getScanner(scan)) { 2110 return countRows(scanner); 2111 } 2112 } 2113 2114 public static int countRows(final InternalScanner scanner) throws IOException { 2115 int scannedCount = 0; 2116 List<Cell> results = new ArrayList<>(); 2117 boolean hasMore = true; 2118 while (hasMore) { 2119 hasMore = scanner.next(results); 2120 scannedCount += results.size(); 2121 results.clear(); 2122 } 2123 return scannedCount; 2124 } 2125 2126 /** 2127 * Return an md5 digest of the entire contents of a table. 2128 */ 2129 public String checksumRows(final Table table) throws Exception { 2130 MessageDigest digest = MessageDigest.getInstance("MD5"); 2131 try (ResultScanner results = table.getScanner(new Scan())) { 2132 for (Result res : results) { 2133 digest.update(res.getRow()); 2134 } 2135 } 2136 return digest.toString(); 2137 } 2138 2139 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2140 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2141 static { 2142 int i = 0; 2143 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2144 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2145 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2146 ROWS[i][0] = b1; 2147 ROWS[i][1] = b2; 2148 ROWS[i][2] = b3; 2149 i++; 2150 } 2151 } 2152 } 2153 } 2154 2155 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2156 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2157 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2158 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2159 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2160 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2161 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2162 2163 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2164 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2165 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2166 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2167 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2168 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2169 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2170 2171 /** 2172 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2173 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2174 * @return list of region info for regions added to meta 2175 */ 2176 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2177 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2178 try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 2179 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2180 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2181 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2182 TableState.State.ENABLED); 2183 // add custom ones 2184 for (int i = 0; i < startKeys.length; i++) { 2185 int j = (i + 1) % startKeys.length; 2186 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2187 .setEndKey(startKeys[j]).build(); 2188 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2189 newRegions.add(hri); 2190 } 2191 return newRegions; 2192 } 2193 } 2194 2195 /** 2196 * Create an unmanaged WAL. Be sure to close it when you're through. 2197 */ 2198 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2199 throws IOException { 2200 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2201 // unless I pass along via the conf. 2202 Configuration confForWAL = new Configuration(conf); 2203 CommonFSUtils.setRootDir(confForWAL, rootDir); 2204 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.insecure().nextNumeric(8)) 2205 .getWAL(hri); 2206 } 2207 2208 /** 2209 * Create a region with it's own WAL. Be sure to call 2210 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2211 */ 2212 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2213 final Configuration conf, final TableDescriptor htd) throws IOException { 2214 return createRegionAndWAL(info, rootDir, conf, htd, true); 2215 } 2216 2217 /** 2218 * Create a region with it's own WAL. Be sure to call 2219 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2220 */ 2221 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2222 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2223 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2224 region.setBlockCache(blockCache); 2225 region.initialize(); 2226 return region; 2227 } 2228 2229 /** 2230 * Create a region with it's own WAL. Be sure to call 2231 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2232 */ 2233 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2234 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2235 throws IOException { 2236 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2237 region.setMobFileCache(mobFileCache); 2238 region.initialize(); 2239 return region; 2240 } 2241 2242 /** 2243 * Create a region with it's own WAL. Be sure to call 2244 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2245 */ 2246 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2247 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2248 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2249 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2250 WAL wal = createWal(conf, rootDir, info); 2251 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2252 } 2253 2254 /** 2255 * Find any other region server which is different from the one identified by parameter 2256 * @return another region server 2257 */ 2258 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2259 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2260 if (!(rst.getRegionServer() == rs)) { 2261 return rst.getRegionServer(); 2262 } 2263 } 2264 return null; 2265 } 2266 2267 /** 2268 * Tool to get the reference to the region server object that holds the region of the specified 2269 * user table. 2270 * @param tableName user table to lookup in hbase:meta 2271 * @return region server that holds it, null if the row doesn't exist 2272 */ 2273 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2274 throws IOException, InterruptedException { 2275 List<RegionInfo> regions = getAdmin().getRegions(tableName); 2276 if (regions == null || regions.isEmpty()) { 2277 return null; 2278 } 2279 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2280 2281 byte[] firstRegionName = 2282 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2283 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2284 2285 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2286 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2287 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2288 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2289 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2290 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2291 while (retrier.shouldRetry()) { 2292 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2293 if (index != -1) { 2294 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2295 } 2296 // Came back -1. Region may not be online yet. Sleep a while. 2297 retrier.sleepUntilNextRetry(); 2298 } 2299 return null; 2300 } 2301 2302 /** 2303 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2304 * MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start multiple 2305 * MiniMRCluster instances with different log dirs. 2306 * @throws IOException When starting the cluster fails. 2307 */ 2308 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2309 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2310 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2311 "99.0"); 2312 startMiniMapReduceCluster(2); 2313 return mrCluster; 2314 } 2315 2316 /** 2317 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2318 * filesystem. MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start 2319 * multiple MiniMRCluster instances with different log dirs. 2320 * @param servers The number of <code>TaskTracker</code>'s to start. 2321 * @throws IOException When starting the cluster fails. 2322 */ 2323 private void startMiniMapReduceCluster(final int servers) throws IOException { 2324 if (mrCluster != null) { 2325 throw new IllegalStateException("MiniMRCluster is already running"); 2326 } 2327 LOG.info("Starting mini mapreduce cluster..."); 2328 setupClusterTestDir(); 2329 createDirsAndSetProperties(); 2330 2331 //// hadoop2 specific settings 2332 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2333 // we up the VM usable so that processes don't get killed. 2334 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2335 2336 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2337 // this avoids the problem by disabling speculative task execution in tests. 2338 conf.setBoolean("mapreduce.map.speculative", false); 2339 conf.setBoolean("mapreduce.reduce.speculative", false); 2340 //// 2341 2342 // Yarn container runs in independent JVM. We need to pass the argument manually here if the 2343 // JDK version >= 17. Otherwise, the MiniMRCluster will fail. 2344 if (JVM.getJVMSpecVersion() >= 17) { 2345 String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", ""); 2346 conf.set("yarn.app.mapreduce.am.command-opts", 2347 jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED"); 2348 } 2349 2350 // Disable fs cache for DistributedFileSystem, to avoid YARN RM close the shared FileSystem 2351 // instance and cause trouble in HBase. See HBASE-29802 for more details. 2352 JobConf mrClusterConf = new JobConf(conf); 2353 mrClusterConf.setBoolean("fs.hdfs.impl.disable.cache", true); 2354 // Allow the user to override FS URI for this map-reduce cluster to use. 2355 mrCluster = 2356 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2357 1, null, null, mrClusterConf); 2358 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2359 if (jobConf == null) { 2360 jobConf = mrCluster.createJobConf(); 2361 } 2362 2363 // Hadoop MiniMR overwrites this while it should not 2364 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2365 LOG.info("Mini mapreduce cluster started"); 2366 2367 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2368 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2369 // necessary config properties here. YARN-129 required adding a few properties. 2370 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2371 // this for mrv2 support; mr1 ignores this 2372 conf.set("mapreduce.framework.name", "yarn"); 2373 conf.setBoolean("yarn.is.minicluster", true); 2374 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2375 if (rmAddress != null) { 2376 conf.set("yarn.resourcemanager.address", rmAddress); 2377 } 2378 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2379 if (historyAddress != null) { 2380 conf.set("mapreduce.jobhistory.address", historyAddress); 2381 } 2382 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2383 if (schedulerAddress != null) { 2384 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2385 } 2386 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2387 if (mrJobHistoryWebappAddress != null) { 2388 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2389 } 2390 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2391 if (yarnRMWebappAddress != null) { 2392 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2393 } 2394 } 2395 2396 /** 2397 * Stops the previously started <code>MiniMRCluster</code>. 2398 */ 2399 public void shutdownMiniMapReduceCluster() { 2400 if (mrCluster != null) { 2401 LOG.info("Stopping mini mapreduce cluster..."); 2402 mrCluster.shutdown(); 2403 mrCluster = null; 2404 LOG.info("Mini mapreduce cluster stopped"); 2405 } 2406 // Restore configuration to point to local jobtracker 2407 conf.set("mapreduce.jobtracker.address", "local"); 2408 } 2409 2410 /** 2411 * Create a stubbed out RegionServerService, mainly for getting FS. 2412 */ 2413 public RegionServerServices createMockRegionServerService() throws IOException { 2414 return createMockRegionServerService((ServerName) null); 2415 } 2416 2417 /** 2418 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2419 * TestTokenAuthentication 2420 */ 2421 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2422 throws IOException { 2423 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2424 rss.setFileSystem(getTestFileSystem()); 2425 rss.setRpcServer(rpc); 2426 return rss; 2427 } 2428 2429 /** 2430 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2431 * TestOpenRegionHandler 2432 */ 2433 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2434 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2435 rss.setFileSystem(getTestFileSystem()); 2436 return rss; 2437 } 2438 2439 /** 2440 * Expire the Master's session 2441 */ 2442 public void expireMasterSession() throws Exception { 2443 HMaster master = getMiniHBaseCluster().getMaster(); 2444 expireSession(master.getZooKeeper(), false); 2445 } 2446 2447 /** 2448 * Expire a region server's session 2449 * @param index which RS 2450 */ 2451 public void expireRegionServerSession(int index) throws Exception { 2452 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2453 expireSession(rs.getZooKeeper(), false); 2454 decrementMinRegionServerCount(); 2455 } 2456 2457 private void decrementMinRegionServerCount() { 2458 // decrement the count for this.conf, for newly spwaned master 2459 // this.hbaseCluster shares this configuration too 2460 decrementMinRegionServerCount(getConfiguration()); 2461 2462 // each master thread keeps a copy of configuration 2463 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2464 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2465 } 2466 } 2467 2468 private void decrementMinRegionServerCount(Configuration conf) { 2469 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2470 if (currentCount != -1) { 2471 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2472 } 2473 } 2474 2475 public void expireSession(ZKWatcher nodeZK) throws Exception { 2476 expireSession(nodeZK, false); 2477 } 2478 2479 /** 2480 * Expire a ZooKeeper session as recommended in ZooKeeper documentation <a href= 2481 * "https://hbase.apache.org/docs/troubleshooting#troubleshooting-zookeeper">Troubleshooting 2482 * ZooKeeper</a> 2483 * <p/> 2484 * There are issues when doing this: 2485 * <ol> 2486 * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li> 2487 * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li> 2488 * </ol> 2489 * @param nodeZK - the ZK watcher to expire 2490 * @param checkStatus - true to check if we can create a Table with the current configuration. 2491 */ 2492 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2493 Configuration c = new Configuration(this.conf); 2494 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2495 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2496 byte[] password = zk.getSessionPasswd(); 2497 long sessionID = zk.getSessionId(); 2498 2499 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2500 // so we create a first watcher to be sure that the 2501 // event was sent. We expect that if our watcher receives the event 2502 // other watchers on the same machine will get is as well. 2503 // When we ask to close the connection, ZK does not close it before 2504 // we receive all the events, so don't have to capture the event, just 2505 // closing the connection should be enough. 2506 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2507 @Override 2508 public void process(WatchedEvent watchedEvent) { 2509 LOG.info("Monitor ZKW received event=" + watchedEvent); 2510 } 2511 }, sessionID, password); 2512 2513 // Making it expire 2514 ZooKeeper newZK = 2515 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2516 2517 // ensure that we have connection to the server before closing down, otherwise 2518 // the close session event will be eaten out before we start CONNECTING state 2519 long start = EnvironmentEdgeManager.currentTime(); 2520 while ( 2521 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2522 ) { 2523 Thread.sleep(1); 2524 } 2525 newZK.close(); 2526 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2527 2528 // Now closing & waiting to be sure that the clients get it. 2529 monitor.close(); 2530 2531 if (checkStatus) { 2532 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2533 } 2534 } 2535 2536 /** 2537 * Get the Mini HBase cluster. 2538 * @return hbase cluster 2539 * @see #getHBaseClusterInterface() 2540 */ 2541 public SingleProcessHBaseCluster getHBaseCluster() { 2542 return getMiniHBaseCluster(); 2543 } 2544 2545 /** 2546 * Returns the HBaseCluster instance. 2547 * <p> 2548 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2549 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2550 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2551 * instead w/o the need to type-cast. 2552 */ 2553 public HBaseClusterInterface getHBaseClusterInterface() { 2554 // implementation note: we should rename this method as #getHBaseCluster(), 2555 // but this would require refactoring 90+ calls. 2556 return hbaseCluster; 2557 } 2558 2559 /** 2560 * Resets the connections so that the next time getConnection() is called, a new connection is 2561 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2562 * the connection is not valid anymore. 2563 * <p/> 2564 * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are 2565 * written, not all start() stop() calls go through this class. Most tests directly operate on the 2566 * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain 2567 * the connection state automatically. Cleaning this is a much bigger refactor. 2568 */ 2569 public void invalidateConnection() throws IOException { 2570 closeConnection(); 2571 // Update the master addresses if they changed. 2572 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2573 final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY); 2574 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2575 masterConfigBefore, masterConfAfter); 2576 conf.set(HConstants.MASTER_ADDRS_KEY, 2577 getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY)); 2578 } 2579 2580 /** 2581 * Get a shared Connection to the cluster. this method is thread safe. 2582 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2583 */ 2584 public Connection getConnection() throws IOException { 2585 return getAsyncConnection().toConnection(); 2586 } 2587 2588 /** 2589 * Get a assigned Connection to the cluster. this method is thread safe. 2590 * @param user assigned user 2591 * @return A Connection with assigned user. 2592 */ 2593 public Connection getConnection(User user) throws IOException { 2594 return getAsyncConnection(user).toConnection(); 2595 } 2596 2597 /** 2598 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2599 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2600 * of cluster. 2601 */ 2602 public AsyncClusterConnection getAsyncConnection() throws IOException { 2603 try { 2604 return asyncConnection.updateAndGet(connection -> { 2605 if (connection == null) { 2606 try { 2607 User user = UserProvider.instantiate(conf).getCurrent(); 2608 connection = getAsyncConnection(user); 2609 } catch (IOException ioe) { 2610 throw new UncheckedIOException("Failed to create connection", ioe); 2611 } 2612 } 2613 return connection; 2614 }); 2615 } catch (UncheckedIOException exception) { 2616 throw exception.getCause(); 2617 } 2618 } 2619 2620 /** 2621 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2622 * @param user assigned user 2623 * @return An AsyncClusterConnection with assigned user. 2624 */ 2625 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2626 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2627 } 2628 2629 public void closeConnection() throws IOException { 2630 if (hbaseAdmin != null) { 2631 Closeables.close(hbaseAdmin, true); 2632 hbaseAdmin = null; 2633 } 2634 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2635 if (asyncConnection != null) { 2636 Closeables.close(asyncConnection, true); 2637 } 2638 } 2639 2640 /** 2641 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2642 * it has no effect, it will be closed automatically when the cluster shutdowns 2643 */ 2644 public Admin getAdmin() throws IOException { 2645 if (hbaseAdmin == null) { 2646 this.hbaseAdmin = getConnection().getAdmin(); 2647 } 2648 return hbaseAdmin; 2649 } 2650 2651 private Admin hbaseAdmin = null; 2652 2653 /** 2654 * Returns an {@link Hbck} instance. Needs be closed when done. 2655 */ 2656 public Hbck getHbck() throws IOException { 2657 return getConnection().getHbck(); 2658 } 2659 2660 /** 2661 * Unassign the named region. 2662 * @param regionName The region to unassign. 2663 */ 2664 public void unassignRegion(String regionName) throws IOException { 2665 unassignRegion(Bytes.toBytes(regionName)); 2666 } 2667 2668 /** 2669 * Unassign the named region. 2670 * @param regionName The region to unassign. 2671 */ 2672 public void unassignRegion(byte[] regionName) throws IOException { 2673 getAdmin().unassign(regionName); 2674 } 2675 2676 /** 2677 * Closes the region containing the given row. 2678 * @param row The row to find the containing region. 2679 * @param table The table to find the region. 2680 */ 2681 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 2682 unassignRegionByRow(Bytes.toBytes(row), table); 2683 } 2684 2685 /** 2686 * Closes the region containing the given row. 2687 * @param row The row to find the containing region. 2688 * @param table The table to find the region. 2689 */ 2690 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 2691 HRegionLocation hrl = table.getRegionLocation(row); 2692 unassignRegion(hrl.getRegion().getRegionName()); 2693 } 2694 2695 /** 2696 * Retrieves a splittable region randomly from tableName 2697 * @param tableName name of table 2698 * @param maxAttempts maximum number of attempts, unlimited for value of -1 2699 * @return the HRegion chosen, null if none was found within limit of maxAttempts 2700 */ 2701 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 2702 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 2703 int regCount = regions.size(); 2704 Set<Integer> attempted = new HashSet<>(); 2705 int idx; 2706 int attempts = 0; 2707 do { 2708 regions = getHBaseCluster().getRegions(tableName); 2709 if (regCount != regions.size()) { 2710 // if there was region movement, clear attempted Set 2711 attempted.clear(); 2712 } 2713 regCount = regions.size(); 2714 // There are chances that before we get the region for the table from an RS the region may 2715 // be going for CLOSE. This may be because online schema change is enabled 2716 if (regCount > 0) { 2717 idx = ThreadLocalRandom.current().nextInt(regCount); 2718 // if we have just tried this region, there is no need to try again 2719 if (attempted.contains(idx)) { 2720 continue; 2721 } 2722 HRegion region = regions.get(idx); 2723 if (region.checkSplit().isPresent()) { 2724 return region; 2725 } 2726 attempted.add(idx); 2727 } 2728 attempts++; 2729 } while (maxAttempts == -1 || attempts < maxAttempts); 2730 return null; 2731 } 2732 2733 public MiniDFSCluster getDFSCluster() { 2734 return dfsCluster; 2735 } 2736 2737 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 2738 setDFSCluster(cluster, true); 2739 } 2740 2741 /** 2742 * Set the MiniDFSCluster 2743 * @param cluster cluster to use 2744 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 2745 * is set. 2746 * @throws IllegalStateException if the passed cluster is up when it is required to be down 2747 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 2748 */ 2749 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 2750 throws IllegalStateException, IOException { 2751 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 2752 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 2753 } 2754 this.dfsCluster = cluster; 2755 this.setFs(); 2756 } 2757 2758 public FileSystem getTestFileSystem() throws IOException { 2759 return HFileSystem.get(conf); 2760 } 2761 2762 /** 2763 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 2764 * (30 seconds). 2765 * @param table Table to wait on. 2766 */ 2767 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 2768 waitTableAvailable(table.getName(), 30000); 2769 } 2770 2771 public void waitTableAvailable(TableName table, long timeoutMillis) 2772 throws InterruptedException, IOException { 2773 waitFor(timeoutMillis, predicateTableAvailable(table)); 2774 } 2775 2776 /** 2777 * Wait until all regions in a table have been assigned 2778 * @param table Table to wait on. 2779 * @param timeoutMillis Timeout. 2780 */ 2781 public void waitTableAvailable(byte[] table, long timeoutMillis) 2782 throws InterruptedException, IOException { 2783 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 2784 } 2785 2786 public String explainTableAvailability(TableName tableName) throws IOException { 2787 StringBuilder msg = 2788 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 2789 if (getHBaseCluster().getMaster().isAlive()) { 2790 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 2791 .getRegionStates().getRegionAssignments(); 2792 final List<Pair<RegionInfo, ServerName>> metaLocations = 2793 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 2794 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 2795 RegionInfo hri = metaLocation.getFirst(); 2796 ServerName sn = metaLocation.getSecond(); 2797 if (!assignments.containsKey(hri)) { 2798 msg.append(", region ").append(hri) 2799 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 2800 } else if (sn == null) { 2801 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 2802 } else if (!sn.equals(assignments.get(hri))) { 2803 msg.append(", region ").append(hri) 2804 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 2805 .append(" <> ").append(assignments.get(hri)); 2806 } 2807 } 2808 } 2809 return msg.toString(); 2810 } 2811 2812 public String explainTableState(final TableName table, TableState.State state) 2813 throws IOException { 2814 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 2815 if (tableState == null) { 2816 return "TableState in META: No table state in META for table " + table 2817 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 2818 } else if (!tableState.inStates(state)) { 2819 return "TableState in META: Not " + state + " state, but " + tableState; 2820 } else { 2821 return "TableState in META: OK"; 2822 } 2823 } 2824 2825 @Nullable 2826 public TableState findLastTableState(final TableName table) throws IOException { 2827 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 2828 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 2829 @Override 2830 public boolean visit(Result r) throws IOException { 2831 if (!Arrays.equals(r.getRow(), table.getName())) { 2832 return false; 2833 } 2834 TableState state = CatalogFamilyFormat.getTableState(r); 2835 if (state != null) { 2836 lastTableState.set(state); 2837 } 2838 return true; 2839 } 2840 }; 2841 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 2842 Integer.MAX_VALUE, visitor); 2843 return lastTableState.get(); 2844 } 2845 2846 /** 2847 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2848 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 2849 * table. 2850 * @param table the table to wait on. 2851 * @throws InterruptedException if interrupted while waiting 2852 * @throws IOException if an IO problem is encountered 2853 */ 2854 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 2855 waitTableEnabled(table, 30000); 2856 } 2857 2858 /** 2859 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2860 * have been all assigned. 2861 * @see #waitTableEnabled(TableName, long) 2862 * @param table Table to wait on. 2863 * @param timeoutMillis Time to wait on it being marked enabled. 2864 */ 2865 public void waitTableEnabled(byte[] table, long timeoutMillis) 2866 throws InterruptedException, IOException { 2867 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 2868 } 2869 2870 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 2871 waitFor(timeoutMillis, predicateTableEnabled(table)); 2872 } 2873 2874 /** 2875 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 2876 * after default period (30 seconds) 2877 * @param table Table to wait on. 2878 */ 2879 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 2880 waitTableDisabled(table, 30000); 2881 } 2882 2883 public void waitTableDisabled(TableName table, long millisTimeout) 2884 throws InterruptedException, IOException { 2885 waitFor(millisTimeout, predicateTableDisabled(table)); 2886 } 2887 2888 /** 2889 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 2890 * @param table Table to wait on. 2891 * @param timeoutMillis Time to wait on it being marked disabled. 2892 */ 2893 public void waitTableDisabled(byte[] table, long timeoutMillis) 2894 throws InterruptedException, IOException { 2895 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 2896 } 2897 2898 /** 2899 * Make sure that at least the specified number of region servers are running 2900 * @param num minimum number of region servers that should be running 2901 * @return true if we started some servers 2902 */ 2903 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 2904 boolean startedServer = false; 2905 SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster(); 2906 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 2907 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 2908 startedServer = true; 2909 } 2910 2911 return startedServer; 2912 } 2913 2914 /** 2915 * Make sure that at least the specified number of region servers are running. We don't count the 2916 * ones that are currently stopping or are stopped. 2917 * @param num minimum number of region servers that should be running 2918 * @return true if we started some servers 2919 */ 2920 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 2921 boolean startedServer = ensureSomeRegionServersAvailable(num); 2922 2923 int nonStoppedServers = 0; 2924 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2925 2926 HRegionServer hrs = rst.getRegionServer(); 2927 if (hrs.isStopping() || hrs.isStopped()) { 2928 LOG.info("A region server is stopped or stopping:" + hrs); 2929 } else { 2930 nonStoppedServers++; 2931 } 2932 } 2933 for (int i = nonStoppedServers; i < num; ++i) { 2934 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 2935 startedServer = true; 2936 } 2937 return startedServer; 2938 } 2939 2940 /** 2941 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 2942 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 2943 * @param c Initial configuration 2944 * @param differentiatingSuffix Suffix to differentiate this user from others. 2945 * @return A new configuration instance with a different user set into it. 2946 */ 2947 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 2948 throws IOException { 2949 FileSystem currentfs = FileSystem.get(c); 2950 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 2951 return User.getCurrent(); 2952 } 2953 // Else distributed filesystem. Make a new instance per daemon. Below 2954 // code is taken from the AppendTestUtil over in hdfs. 2955 String username = User.getCurrent().getName() + differentiatingSuffix; 2956 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 2957 return user; 2958 } 2959 2960 public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster) 2961 throws IOException { 2962 NavigableSet<String> online = new TreeSet<>(); 2963 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 2964 try { 2965 for (RegionInfo region : ProtobufUtil 2966 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 2967 online.add(region.getRegionNameAsString()); 2968 } 2969 } catch (RegionServerStoppedException e) { 2970 // That's fine. 2971 } 2972 } 2973 return online; 2974 } 2975 2976 /** 2977 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 2978 * linger. Here is the exception you'll see: 2979 * 2980 * <pre> 2981 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 2982 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 2983 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 2984 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 2985 * </pre> 2986 * 2987 * @param stream A DFSClient.DFSOutputStream. 2988 */ 2989 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 2990 try { 2991 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 2992 for (Class<?> clazz : clazzes) { 2993 String className = clazz.getSimpleName(); 2994 if (className.equals("DFSOutputStream")) { 2995 if (clazz.isInstance(stream)) { 2996 Field maxRecoveryErrorCountField = 2997 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 2998 maxRecoveryErrorCountField.setAccessible(true); 2999 maxRecoveryErrorCountField.setInt(stream, max); 3000 break; 3001 } 3002 } 3003 } 3004 } catch (Exception e) { 3005 LOG.info("Could not set max recovery field", e); 3006 } 3007 } 3008 3009 /** 3010 * Uses directly the assignment manager to assign the region. and waits until the specified region 3011 * has completed assignment. 3012 * @return true if the region is assigned false otherwise. 3013 */ 3014 public boolean assignRegion(final RegionInfo regionInfo) 3015 throws IOException, InterruptedException { 3016 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 3017 am.assign(regionInfo); 3018 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 3019 } 3020 3021 /** 3022 * Move region to destination server and wait till region is completely moved and online 3023 * @param destRegion region to move 3024 * @param destServer destination server of the region 3025 */ 3026 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3027 throws InterruptedException, IOException { 3028 HMaster master = getMiniHBaseCluster().getMaster(); 3029 // TODO: Here we start the move. The move can take a while. 3030 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3031 while (true) { 3032 ServerName serverName = 3033 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3034 if (serverName != null && serverName.equals(destServer)) { 3035 assertRegionOnServer(destRegion, serverName, 2000); 3036 break; 3037 } 3038 Thread.sleep(10); 3039 } 3040 } 3041 3042 /** 3043 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3044 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3045 * master has been informed and updated hbase:meta with the regions deployed server. 3046 * @param tableName the table name 3047 */ 3048 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3049 waitUntilAllRegionsAssigned(tableName, 3050 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3051 } 3052 3053 /** 3054 * Waith until all system table's regions get assigned 3055 */ 3056 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3057 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3058 } 3059 3060 /** 3061 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3062 * timeout. This means all regions have been deployed, master has been informed and updated 3063 * hbase:meta with the regions deployed server. 3064 * @param tableName the table name 3065 * @param timeout timeout, in milliseconds 3066 */ 3067 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3068 throws IOException { 3069 if (!TableName.isMetaTableName(tableName)) { 3070 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3071 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3072 + timeout + "ms"); 3073 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3074 @Override 3075 public String explainFailure() throws IOException { 3076 return explainTableAvailability(tableName); 3077 } 3078 3079 @Override 3080 public boolean evaluate() throws IOException { 3081 Scan scan = new Scan(); 3082 scan.addFamily(HConstants.CATALOG_FAMILY); 3083 boolean tableFound = false; 3084 try (ResultScanner s = meta.getScanner(scan)) { 3085 for (Result r; (r = s.next()) != null;) { 3086 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3087 RegionInfo info = RegionInfo.parseFromOrNull(b); 3088 if (info != null && info.getTable().equals(tableName)) { 3089 // Get server hosting this region from catalog family. Return false if no server 3090 // hosting this region, or if the server hosting this region was recently killed 3091 // (for fault tolerance testing). 3092 tableFound = true; 3093 byte[] server = 3094 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3095 if (server == null) { 3096 return false; 3097 } else { 3098 byte[] startCode = 3099 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3100 ServerName serverName = 3101 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3102 + Bytes.toLong(startCode)); 3103 if ( 3104 !getHBaseClusterInterface().isDistributedCluster() 3105 && getHBaseCluster().isKilledRS(serverName) 3106 ) { 3107 return false; 3108 } 3109 } 3110 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3111 return false; 3112 } 3113 } 3114 } 3115 } 3116 if (!tableFound) { 3117 LOG.warn( 3118 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3119 } 3120 return tableFound; 3121 } 3122 }); 3123 } 3124 } 3125 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3126 // check from the master state if we are using a mini cluster 3127 if (!getHBaseClusterInterface().isDistributedCluster()) { 3128 // So, all regions are in the meta table but make sure master knows of the assignments before 3129 // returning -- sometimes this can lag. 3130 HMaster master = getHBaseCluster().getMaster(); 3131 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3132 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3133 @Override 3134 public String explainFailure() throws IOException { 3135 return explainTableAvailability(tableName); 3136 } 3137 3138 @Override 3139 public boolean evaluate() throws IOException { 3140 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3141 return hris != null && !hris.isEmpty(); 3142 } 3143 }); 3144 } 3145 LOG.info("All regions for table " + tableName + " assigned."); 3146 } 3147 3148 /** 3149 * Do a small get/scan against one store. This is required because store has no actual methods of 3150 * querying itself, and relies on StoreScanner. 3151 */ 3152 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3153 Scan scan = new Scan(get); 3154 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3155 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3156 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3157 // readpoint 0. 3158 0); 3159 3160 List<Cell> result = new ArrayList<>(); 3161 scanner.next(result); 3162 if (!result.isEmpty()) { 3163 // verify that we are on the row we want: 3164 Cell kv = result.get(0); 3165 if (!CellUtil.matchingRows(kv, get.getRow())) { 3166 result.clear(); 3167 } 3168 } 3169 scanner.close(); 3170 return result; 3171 } 3172 3173 /** 3174 * Create region split keys between startkey and endKey 3175 * @param numRegions the number of regions to be created. it has to be greater than 3. 3176 * @return resulting split keys 3177 */ 3178 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3179 assertTrue(numRegions > 3); 3180 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3181 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3182 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3183 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3184 return result; 3185 } 3186 3187 /** 3188 * Do a small get/scan against one store. This is required because store has no actual methods of 3189 * querying itself, and relies on StoreScanner. 3190 */ 3191 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3192 throws IOException { 3193 Get get = new Get(row); 3194 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3195 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3196 3197 return getFromStoreFile(store, get); 3198 } 3199 3200 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3201 final List<? extends Cell> actual) { 3202 final int eLen = expected.size(); 3203 final int aLen = actual.size(); 3204 final int minLen = Math.min(eLen, aLen); 3205 3206 int i = 0; 3207 while ( 3208 i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0 3209 ) { 3210 i++; 3211 } 3212 3213 if (additionalMsg == null) { 3214 additionalMsg = ""; 3215 } 3216 if (!additionalMsg.isEmpty()) { 3217 additionalMsg = ". " + additionalMsg; 3218 } 3219 3220 if (eLen != aLen || i != minLen) { 3221 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3222 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3223 + " (length " + aLen + ")" + additionalMsg); 3224 } 3225 } 3226 3227 public static <T> String safeGetAsStr(List<T> lst, int i) { 3228 if (0 <= i && i < lst.size()) { 3229 return lst.get(i).toString(); 3230 } else { 3231 return "<out_of_range>"; 3232 } 3233 } 3234 3235 public String getRpcConnnectionURI() throws UnknownHostException { 3236 return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); 3237 } 3238 3239 public String getZkConnectionURI() { 3240 return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 3241 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3242 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3243 } 3244 3245 /** 3246 * Get the zk based cluster key for this cluster. 3247 * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the 3248 * connection info of a cluster. Keep here only for compatibility. 3249 * @see #getRpcConnnectionURI() 3250 * @see #getZkConnectionURI() 3251 */ 3252 @Deprecated 3253 public String getClusterKey() { 3254 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3255 + ":" 3256 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3257 } 3258 3259 /** 3260 * Creates a random table with the given parameters 3261 */ 3262 public Table createRandomTable(TableName tableName, final Collection<String> families, 3263 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3264 final int numRowsPerFlush) throws IOException, InterruptedException { 3265 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3266 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3267 + maxVersions + "\n"); 3268 3269 final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L); 3270 final int numCF = families.size(); 3271 final byte[][] cfBytes = new byte[numCF][]; 3272 { 3273 int cfIndex = 0; 3274 for (String cf : families) { 3275 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3276 } 3277 } 3278 3279 final int actualStartKey = 0; 3280 final int actualEndKey = Integer.MAX_VALUE; 3281 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3282 final int splitStartKey = actualStartKey + keysPerRegion; 3283 final int splitEndKey = actualEndKey - keysPerRegion; 3284 final String keyFormat = "%08x"; 3285 final Table table = createTable(tableName, cfBytes, maxVersions, 3286 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3287 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3288 3289 if (hbaseCluster != null) { 3290 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3291 } 3292 3293 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3294 3295 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3296 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3297 final byte[] row = Bytes.toBytes( 3298 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3299 3300 Put put = new Put(row); 3301 Delete del = new Delete(row); 3302 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3303 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3304 final long ts = rand.nextInt(); 3305 final byte[] qual = Bytes.toBytes("col" + iCol); 3306 if (rand.nextBoolean()) { 3307 final byte[] value = 3308 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3309 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3310 put.addColumn(cf, qual, ts, value); 3311 } else if (rand.nextDouble() < 0.8) { 3312 del.addColumn(cf, qual, ts); 3313 } else { 3314 del.addColumns(cf, qual, ts); 3315 } 3316 } 3317 3318 if (!put.isEmpty()) { 3319 mutator.mutate(put); 3320 } 3321 3322 if (!del.isEmpty()) { 3323 mutator.mutate(del); 3324 } 3325 } 3326 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3327 mutator.flush(); 3328 if (hbaseCluster != null) { 3329 getMiniHBaseCluster().flushcache(table.getName()); 3330 } 3331 } 3332 mutator.close(); 3333 3334 return table; 3335 } 3336 3337 public static int randomFreePort() { 3338 return HBaseCommonTestingUtil.randomFreePort(); 3339 } 3340 3341 public static String randomMultiCastAddress() { 3342 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3343 } 3344 3345 public static void waitForHostPort(String host, int port) throws IOException { 3346 final int maxTimeMs = 10000; 3347 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3348 IOException savedException = null; 3349 LOG.info("Waiting for server at " + host + ":" + port); 3350 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3351 try { 3352 Socket sock = new Socket(InetAddress.getByName(host), port); 3353 sock.close(); 3354 savedException = null; 3355 LOG.info("Server at " + host + ":" + port + " is available"); 3356 break; 3357 } catch (UnknownHostException e) { 3358 throw new IOException("Failed to look up " + host, e); 3359 } catch (IOException e) { 3360 savedException = e; 3361 } 3362 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3363 } 3364 3365 if (savedException != null) { 3366 throw savedException; 3367 } 3368 } 3369 3370 public static int getMetaRSPort(Connection connection) throws IOException { 3371 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3372 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3373 } 3374 } 3375 3376 /** 3377 * Due to async racing issue, a region may not be in the online region list of a region server 3378 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3379 */ 3380 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3381 final long timeout) throws IOException, InterruptedException { 3382 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3383 while (true) { 3384 List<RegionInfo> regions = getAdmin().getRegions(server); 3385 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3386 long now = EnvironmentEdgeManager.currentTime(); 3387 if (now > timeoutTime) break; 3388 Thread.sleep(10); 3389 } 3390 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3391 } 3392 3393 /** 3394 * Check to make sure the region is open on the specified region server, but not on any other one. 3395 */ 3396 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3397 final long timeout) throws IOException, InterruptedException { 3398 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3399 while (true) { 3400 List<RegionInfo> regions = getAdmin().getRegions(server); 3401 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3402 List<JVMClusterUtil.RegionServerThread> rsThreads = 3403 getHBaseCluster().getLiveRegionServerThreads(); 3404 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3405 HRegionServer rs = rsThread.getRegionServer(); 3406 if (server.equals(rs.getServerName())) { 3407 continue; 3408 } 3409 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3410 for (HRegion r : hrs) { 3411 assertTrue(r.getRegionInfo().getRegionId() != hri.getRegionId(), 3412 "Region should not be double assigned"); 3413 } 3414 } 3415 return; // good, we are happy 3416 } 3417 long now = EnvironmentEdgeManager.currentTime(); 3418 if (now > timeoutTime) break; 3419 Thread.sleep(10); 3420 } 3421 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3422 } 3423 3424 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3425 TableDescriptor td = 3426 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3427 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3428 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3429 } 3430 3431 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3432 BlockCache blockCache) throws IOException { 3433 TableDescriptor td = 3434 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3435 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3436 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3437 } 3438 3439 public static void setFileSystemURI(String fsURI) { 3440 FS_URI = fsURI; 3441 } 3442 3443 /** 3444 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3445 */ 3446 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3447 return new ExplainingPredicate<IOException>() { 3448 @Override 3449 public String explainFailure() throws IOException { 3450 final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager(); 3451 return "found in transition: " + am.getRegionsInTransition().toString(); 3452 } 3453 3454 @Override 3455 public boolean evaluate() throws IOException { 3456 HMaster master = getMiniHBaseCluster().getMaster(); 3457 if (master == null) return false; 3458 AssignmentManager am = master.getAssignmentManager(); 3459 if (am == null) return false; 3460 return !am.hasRegionsInTransition(); 3461 } 3462 }; 3463 } 3464 3465 /** 3466 * Returns a {@link Predicate} for checking that there are no procedure to region transition in 3467 * master 3468 */ 3469 public ExplainingPredicate<IOException> predicateNoRegionTransitScheduled() { 3470 return new ExplainingPredicate<IOException>() { 3471 @Override 3472 public String explainFailure() throws IOException { 3473 final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager(); 3474 return "Number of procedure scheduled for region transit: " 3475 + am.getRegionTransitScheduledCount(); 3476 } 3477 3478 @Override 3479 public boolean evaluate() throws IOException { 3480 HMaster master = getMiniHBaseCluster().getMaster(); 3481 if (master == null) { 3482 return false; 3483 } 3484 AssignmentManager am = master.getAssignmentManager(); 3485 if (am == null) { 3486 return false; 3487 } 3488 return am.getRegionTransitScheduledCount() == 0; 3489 } 3490 }; 3491 } 3492 3493 /** 3494 * Returns a {@link Predicate} for checking that table is enabled 3495 */ 3496 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3497 return new ExplainingPredicate<IOException>() { 3498 @Override 3499 public String explainFailure() throws IOException { 3500 return explainTableState(tableName, TableState.State.ENABLED); 3501 } 3502 3503 @Override 3504 public boolean evaluate() throws IOException { 3505 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3506 } 3507 }; 3508 } 3509 3510 /** 3511 * Returns a {@link Predicate} for checking that table is enabled 3512 */ 3513 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3514 return new ExplainingPredicate<IOException>() { 3515 @Override 3516 public String explainFailure() throws IOException { 3517 return explainTableState(tableName, TableState.State.DISABLED); 3518 } 3519 3520 @Override 3521 public boolean evaluate() throws IOException { 3522 return getAdmin().isTableDisabled(tableName); 3523 } 3524 }; 3525 } 3526 3527 /** 3528 * Returns a {@link Predicate} for checking that table is enabled 3529 */ 3530 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3531 return new ExplainingPredicate<IOException>() { 3532 @Override 3533 public String explainFailure() throws IOException { 3534 return explainTableAvailability(tableName); 3535 } 3536 3537 @Override 3538 public boolean evaluate() throws IOException { 3539 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3540 if (tableAvailable) { 3541 try (Table table = getConnection().getTable(tableName)) { 3542 TableDescriptor htd = table.getDescriptor(); 3543 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3544 .getAllRegionLocations()) { 3545 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3546 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3547 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3548 for (byte[] family : htd.getColumnFamilyNames()) { 3549 scan.addFamily(family); 3550 } 3551 try (ResultScanner scanner = table.getScanner(scan)) { 3552 scanner.next(); 3553 } 3554 } 3555 } 3556 } 3557 return tableAvailable; 3558 } 3559 }; 3560 } 3561 3562 /** 3563 * Wait until no regions in transition. 3564 * @param timeout How long to wait. 3565 */ 3566 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3567 waitFor(timeout, predicateNoRegionsInTransition()); 3568 } 3569 3570 /** 3571 * Wait until no regions in transition. 3572 * @param timeout How long to wait. 3573 */ 3574 public void waitUntilNoRegionTransitScheduled(final long timeout) throws IOException { 3575 waitFor(timeout, predicateNoRegionTransitScheduled()); 3576 } 3577 3578 /** 3579 * Wait until no regions in transition. (time limit 15min) 3580 */ 3581 public void waitUntilNoRegionsInTransition() throws IOException { 3582 waitUntilNoRegionsInTransition(15 * 60000); 3583 } 3584 3585 /** 3586 * Wait until no TRSP is present 3587 */ 3588 public void waitUntilNoRegionTransitScheduled() throws IOException { 3589 waitUntilNoRegionTransitScheduled(15 * 60000); 3590 } 3591 3592 /** 3593 * Wait until labels is ready in VisibilityLabelsCache. 3594 */ 3595 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 3596 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 3597 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 3598 3599 @Override 3600 public boolean evaluate() { 3601 for (String label : labels) { 3602 if (labelsCache.getLabelOrdinal(label) == 0) { 3603 return false; 3604 } 3605 } 3606 return true; 3607 } 3608 3609 @Override 3610 public String explainFailure() { 3611 for (String label : labels) { 3612 if (labelsCache.getLabelOrdinal(label) == 0) { 3613 return label + " is not available yet"; 3614 } 3615 } 3616 return ""; 3617 } 3618 }); 3619 } 3620 3621 /** 3622 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3623 * available. 3624 * @return the list of column descriptors 3625 */ 3626 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 3627 return generateColumnDescriptors(""); 3628 } 3629 3630 /** 3631 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3632 * available. 3633 * @param prefix family names prefix 3634 * @return the list of column descriptors 3635 */ 3636 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 3637 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 3638 long familyId = 0; 3639 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 3640 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 3641 for (BloomType bloomType : BloomType.values()) { 3642 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 3643 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 3644 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 3645 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 3646 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 3647 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 3648 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 3649 familyId++; 3650 } 3651 } 3652 } 3653 return columnFamilyDescriptors; 3654 } 3655 3656 /** 3657 * Get supported compression algorithms. 3658 * @return supported compression algorithms. 3659 */ 3660 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 3661 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 3662 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 3663 for (String algoName : allAlgos) { 3664 try { 3665 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 3666 algo.getCompressor(); 3667 supportedAlgos.add(algo); 3668 } catch (Throwable t) { 3669 // this algo is not available 3670 } 3671 } 3672 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 3673 } 3674 3675 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 3676 Scan scan = new Scan().withStartRow(row); 3677 scan.setReadType(ReadType.PREAD); 3678 scan.setCaching(1); 3679 scan.setReversed(true); 3680 scan.addFamily(family); 3681 try (RegionScanner scanner = r.getScanner(scan)) { 3682 List<Cell> cells = new ArrayList<>(1); 3683 scanner.next(cells); 3684 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 3685 return null; 3686 } 3687 return Result.create(cells); 3688 } 3689 } 3690 3691 private boolean isTargetTable(final byte[] inRow, Cell c) { 3692 String inputRowString = Bytes.toString(inRow); 3693 int i = inputRowString.indexOf(HConstants.DELIMITER); 3694 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 3695 int o = outputRowString.indexOf(HConstants.DELIMITER); 3696 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 3697 } 3698 3699 /** 3700 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 3701 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 3702 * kerby KDC server and utility for using it, 3703 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 3704 * less baggage. It came in in HBASE-5291. 3705 */ 3706 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 3707 Properties conf = MiniKdc.createConf(); 3708 conf.put(MiniKdc.DEBUG, true); 3709 MiniKdc kdc = null; 3710 File dir = null; 3711 // There is time lag between selecting a port and trying to bind with it. It's possible that 3712 // another service captures the port in between which'll result in BindException. 3713 boolean bindException; 3714 int numTries = 0; 3715 do { 3716 try { 3717 bindException = false; 3718 dir = new File(getDataTestDir("kdc").toUri().getPath()); 3719 kdc = new MiniKdc(conf, dir); 3720 kdc.start(); 3721 } catch (BindException e) { 3722 FileUtils.deleteDirectory(dir); // clean directory 3723 numTries++; 3724 if (numTries == 3) { 3725 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 3726 throw e; 3727 } 3728 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 3729 bindException = true; 3730 } 3731 } while (bindException); 3732 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 3733 return kdc; 3734 } 3735 3736 public int getNumHFiles(final TableName tableName, final byte[] family) { 3737 int numHFiles = 0; 3738 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 3739 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 3740 } 3741 return numHFiles; 3742 } 3743 3744 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 3745 final byte[] family) { 3746 int numHFiles = 0; 3747 for (Region region : rs.getRegions(tableName)) { 3748 numHFiles += region.getStore(family).getStorefilesCount(); 3749 } 3750 return numHFiles; 3751 } 3752 3753 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 3754 assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode()); 3755 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 3756 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 3757 assertEquals(ltdFamilies.size(), rtdFamilies.size()); 3758 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 3759 it2 = rtdFamilies.iterator(); it.hasNext();) { 3760 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 3761 } 3762 } 3763 3764 /** 3765 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 3766 * invocations. 3767 */ 3768 public static void await(final long sleepMillis, final BooleanSupplier condition) 3769 throws InterruptedException { 3770 try { 3771 while (!condition.getAsBoolean()) { 3772 Thread.sleep(sleepMillis); 3773 } 3774 } catch (RuntimeException e) { 3775 if (e.getCause() instanceof AssertionError) { 3776 throw (AssertionError) e.getCause(); 3777 } 3778 throw e; 3779 } 3780 } 3781 3782 public void createRegionDir(RegionInfo hri) throws IOException { 3783 Path rootDir = getDataTestDir(); 3784 Path tableDir = CommonFSUtils.getTableDir(rootDir, hri.getTable()); 3785 Path regionDir = new Path(tableDir, hri.getEncodedName()); 3786 FileSystem fs = getTestFileSystem(); 3787 if (!fs.exists(regionDir)) { 3788 fs.mkdirs(regionDir); 3789 } 3790 } 3791 3792 public void createRegionDir(RegionInfo regionInfo, MasterFileSystem masterFileSystem) 3793 throws IOException { 3794 Path tableDir = 3795 CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), regionInfo.getTable()); 3796 HRegionFileSystem.createRegionOnFileSystem(conf, masterFileSystem.getFileSystem(), tableDir, 3797 regionInfo); 3798 } 3799}