001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.Closeable; 026import java.io.File; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.io.UncheckedIOException; 030import java.lang.reflect.Field; 031import java.net.BindException; 032import java.net.DatagramSocket; 033import java.net.InetAddress; 034import java.net.ServerSocket; 035import java.net.Socket; 036import java.net.UnknownHostException; 037import java.nio.charset.StandardCharsets; 038import java.security.MessageDigest; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.HashSet; 044import java.util.Iterator; 045import java.util.List; 046import java.util.Map; 047import java.util.NavigableSet; 048import java.util.Properties; 049import java.util.Random; 050import java.util.Set; 051import java.util.TreeSet; 052import java.util.concurrent.ExecutionException; 053import java.util.concurrent.ThreadLocalRandom; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicReference; 056import java.util.function.BooleanSupplier; 057import org.apache.commons.io.FileUtils; 058import org.apache.commons.lang3.RandomStringUtils; 059import org.apache.hadoop.conf.Configuration; 060import org.apache.hadoop.fs.FileSystem; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 063import org.apache.hadoop.hbase.Waiter.Predicate; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.AsyncAdmin; 066import org.apache.hadoop.hbase.client.AsyncClusterConnection; 067import org.apache.hadoop.hbase.client.BufferedMutator; 068import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.Connection; 072import org.apache.hadoop.hbase.client.ConnectionFactory; 073import org.apache.hadoop.hbase.client.Consistency; 074import org.apache.hadoop.hbase.client.Delete; 075import org.apache.hadoop.hbase.client.Durability; 076import org.apache.hadoop.hbase.client.Get; 077import org.apache.hadoop.hbase.client.Hbck; 078import org.apache.hadoop.hbase.client.MasterRegistry; 079import org.apache.hadoop.hbase.client.Put; 080import org.apache.hadoop.hbase.client.RegionInfo; 081import org.apache.hadoop.hbase.client.RegionInfoBuilder; 082import org.apache.hadoop.hbase.client.RegionLocator; 083import org.apache.hadoop.hbase.client.Result; 084import org.apache.hadoop.hbase.client.ResultScanner; 085import org.apache.hadoop.hbase.client.Scan; 086import org.apache.hadoop.hbase.client.Scan.ReadType; 087import org.apache.hadoop.hbase.client.Table; 088import org.apache.hadoop.hbase.client.TableDescriptor; 089import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 090import org.apache.hadoop.hbase.client.TableState; 091import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 092import org.apache.hadoop.hbase.fs.HFileSystem; 093import org.apache.hadoop.hbase.io.compress.Compression; 094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 096import org.apache.hadoop.hbase.io.hfile.BlockCache; 097import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 098import org.apache.hadoop.hbase.io.hfile.HFile; 099import org.apache.hadoop.hbase.ipc.RpcServerInterface; 100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 101import org.apache.hadoop.hbase.master.HMaster; 102import org.apache.hadoop.hbase.master.MasterFileSystem; 103import org.apache.hadoop.hbase.master.RegionState; 104import org.apache.hadoop.hbase.master.ServerManager; 105import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 106import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 107import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 108import org.apache.hadoop.hbase.master.assignment.RegionStates; 109import org.apache.hadoop.hbase.mob.MobFileCache; 110import org.apache.hadoop.hbase.regionserver.BloomType; 111import org.apache.hadoop.hbase.regionserver.ChunkCreator; 112import org.apache.hadoop.hbase.regionserver.HRegion; 113import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 114import org.apache.hadoop.hbase.regionserver.HRegionServer; 115import org.apache.hadoop.hbase.regionserver.HStore; 116import org.apache.hadoop.hbase.regionserver.InternalScanner; 117import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 118import org.apache.hadoop.hbase.regionserver.Region; 119import org.apache.hadoop.hbase.regionserver.RegionScanner; 120import org.apache.hadoop.hbase.regionserver.RegionServerServices; 121import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 122import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 123import org.apache.hadoop.hbase.security.User; 124import org.apache.hadoop.hbase.security.UserProvider; 125import org.apache.hadoop.hbase.security.access.AccessController; 126import org.apache.hadoop.hbase.security.access.PermissionStorage; 127import org.apache.hadoop.hbase.security.access.SecureTestUtil; 128import org.apache.hadoop.hbase.security.token.TokenProvider; 129import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 130import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 131import org.apache.hadoop.hbase.util.Bytes; 132import org.apache.hadoop.hbase.util.CommonFSUtils; 133import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 134import org.apache.hadoop.hbase.util.FSUtils; 135import org.apache.hadoop.hbase.util.JVM; 136import org.apache.hadoop.hbase.util.JVMClusterUtil; 137import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 138import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 139import org.apache.hadoop.hbase.util.Pair; 140import org.apache.hadoop.hbase.util.RetryCounter; 141import org.apache.hadoop.hbase.util.Threads; 142import org.apache.hadoop.hbase.wal.WAL; 143import org.apache.hadoop.hbase.wal.WALFactory; 144import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 145import org.apache.hadoop.hbase.zookeeper.ZKConfig; 146import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 147import org.apache.hadoop.hdfs.DFSClient; 148import org.apache.hadoop.hdfs.DFSConfigKeys; 149import org.apache.hadoop.hdfs.DistributedFileSystem; 150import org.apache.hadoop.hdfs.MiniDFSCluster; 151import org.apache.hadoop.hdfs.server.datanode.DataNode; 152import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 153import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 154import org.apache.hadoop.mapred.JobConf; 155import org.apache.hadoop.mapred.MiniMRCluster; 156import org.apache.hadoop.minikdc.MiniKdc; 157import org.apache.yetus.audience.InterfaceAudience; 158import org.apache.yetus.audience.InterfaceStability; 159import org.apache.zookeeper.WatchedEvent; 160import org.apache.zookeeper.ZooKeeper; 161import org.apache.zookeeper.ZooKeeper.States; 162 163import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 164 165import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 166 167/** 168 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 169 * functionality. Create an instance and keep it around testing HBase. 170 * <p/> 171 * This class is meant to be your one-stop shop for anything you might need testing. Manages one 172 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster}, 173 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real 174 * cluster. 175 * <p/> 176 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration. 177 * <p/> 178 * It does not set logging levels. 179 * <p/> 180 * In the configuration properties, default values for master-info-port and region-server-port are 181 * overridden such that a random port will be assigned (thus avoiding port contention if another 182 * local HBase instance is already running). 183 * <p/> 184 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 185 * setting it to true. 186 */ 187@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 188@InterfaceStability.Evolving 189public class HBaseTestingUtil extends HBaseZKTestingUtil { 190 191 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 192 193 private MiniDFSCluster dfsCluster = null; 194 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 195 196 private volatile HBaseClusterInterface hbaseCluster = null; 197 private MiniMRCluster mrCluster = null; 198 199 /** If there is a mini cluster running for this testing utility instance. */ 200 private volatile boolean miniClusterRunning; 201 202 private String hadoopLogDir; 203 204 /** 205 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 206 */ 207 private Path dataTestDirOnTestFS = null; 208 209 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 210 211 /** Filesystem URI used for map-reduce mini-cluster setup */ 212 private static String FS_URI; 213 214 /** This is for unit tests parameterized with a single boolean. */ 215 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 216 217 /** 218 * Checks to see if a specific port is available. 219 * @param port the port number to check for availability 220 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 221 */ 222 public static boolean available(int port) { 223 ServerSocket ss = null; 224 DatagramSocket ds = null; 225 try { 226 ss = new ServerSocket(port); 227 ss.setReuseAddress(true); 228 ds = new DatagramSocket(port); 229 ds.setReuseAddress(true); 230 return true; 231 } catch (IOException e) { 232 // Do nothing 233 } finally { 234 if (ds != null) { 235 ds.close(); 236 } 237 238 if (ss != null) { 239 try { 240 ss.close(); 241 } catch (IOException e) { 242 /* should not be thrown */ 243 } 244 } 245 } 246 247 return false; 248 } 249 250 /** 251 * Create all combinations of Bloom filters and compression algorithms for testing. 252 */ 253 private static List<Object[]> bloomAndCompressionCombinations() { 254 List<Object[]> configurations = new ArrayList<>(); 255 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 256 for (BloomType bloomType : BloomType.values()) { 257 configurations.add(new Object[] { comprAlgo, bloomType }); 258 } 259 } 260 return Collections.unmodifiableList(configurations); 261 } 262 263 /** 264 * Create combination of memstoreTS and tags 265 */ 266 private static List<Object[]> memStoreTSAndTagsCombination() { 267 List<Object[]> configurations = new ArrayList<>(); 268 configurations.add(new Object[] { false, false }); 269 configurations.add(new Object[] { false, true }); 270 configurations.add(new Object[] { true, false }); 271 configurations.add(new Object[] { true, true }); 272 return Collections.unmodifiableList(configurations); 273 } 274 275 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 276 List<Object[]> configurations = new ArrayList<>(); 277 configurations.add(new Object[] { false, false, true }); 278 configurations.add(new Object[] { false, false, false }); 279 configurations.add(new Object[] { false, true, true }); 280 configurations.add(new Object[] { false, true, false }); 281 configurations.add(new Object[] { true, false, true }); 282 configurations.add(new Object[] { true, false, false }); 283 configurations.add(new Object[] { true, true, true }); 284 configurations.add(new Object[] { true, true, false }); 285 return Collections.unmodifiableList(configurations); 286 } 287 288 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 289 bloomAndCompressionCombinations(); 290 291 /** 292 * <p> 293 * Create an HBaseTestingUtility using a default configuration. 294 * <p> 295 * Initially, all tmp files are written to a local test data directory. Once 296 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 297 * data will be written to the DFS directory instead. 298 */ 299 public HBaseTestingUtil() { 300 this(HBaseConfiguration.create()); 301 } 302 303 /** 304 * <p> 305 * Create an HBaseTestingUtility using a given configuration. 306 * <p> 307 * Initially, all tmp files are written to a local test data directory. Once 308 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 309 * data will be written to the DFS directory instead. 310 * @param conf The configuration to use for further operations 311 */ 312 public HBaseTestingUtil(@Nullable Configuration conf) { 313 super(conf); 314 315 // a hbase checksum verification failure will cause unit tests to fail 316 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 317 318 // Save this for when setting default file:// breaks things 319 if (this.conf.get("fs.defaultFS") != null) { 320 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 321 } 322 if (this.conf.get(HConstants.HBASE_DIR) != null) { 323 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 324 } 325 // Every cluster is a local cluster until we start DFS 326 // Note that conf could be null, but this.conf will not be 327 String dataTestDir = getDataTestDir().toString(); 328 this.conf.set("fs.defaultFS", "file:///"); 329 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 330 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 331 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 332 // If the value for random ports isn't set set it to true, thus making 333 // tests opt-out for random port assignment 334 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 335 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 336 } 337 338 /** 339 * Close both the region {@code r} and it's underlying WAL. For use in tests. 340 */ 341 public static void closeRegionAndWAL(final Region r) throws IOException { 342 closeRegionAndWAL((HRegion) r); 343 } 344 345 /** 346 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 347 */ 348 public static void closeRegionAndWAL(final HRegion r) throws IOException { 349 if (r == null) return; 350 r.close(); 351 if (r.getWAL() == null) return; 352 r.getWAL().close(); 353 } 354 355 /** 356 * Start mini secure cluster with given kdc and principals. 357 * @param kdc Mini kdc server 358 * @param servicePrincipal Service principal without realm. 359 * @param spnegoPrincipal Spnego principal without realm. 360 * @return Handler to shutdown the cluster 361 */ 362 public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal, 363 String spnegoPrincipal) throws Exception { 364 Configuration conf = getConfiguration(); 365 366 SecureTestUtil.enableSecurity(conf); 367 VisibilityTestUtil.enableVisiblityLabels(conf); 368 SecureTestUtil.verifyConfiguration(conf); 369 370 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 371 AccessController.class.getName() + ',' + TokenProvider.class.getName()); 372 373 HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(), 374 spnegoPrincipal + '@' + kdc.getRealm()); 375 376 startMiniCluster(); 377 try { 378 waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); 379 } catch (Exception e) { 380 shutdownMiniCluster(); 381 throw e; 382 } 383 384 return this::shutdownMiniCluster; 385 } 386 387 /** 388 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 389 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 390 * by the Configuration. If say, a Connection was being used against a cluster that had been 391 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 392 * Rather than use the return direct, its usually best to make a copy and use that. Do 393 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 394 * @return Instance of Configuration. 395 */ 396 @Override 397 public Configuration getConfiguration() { 398 return super.getConfiguration(); 399 } 400 401 public void setHBaseCluster(HBaseClusterInterface hbaseCluster) { 402 this.hbaseCluster = hbaseCluster; 403 } 404 405 /** 406 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 407 * have many concurrent tests running if we need to. Moding a System property is not the way to do 408 * concurrent instances -- another instance could grab the temporary value unintentionally -- but 409 * not anything can do about it at moment; single instance only is how the minidfscluster works. 410 * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir 411 * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir 412 * (We do not create them!). 413 * @return The calculated data test build directory, if newly-created. 414 */ 415 @Override 416 protected Path setupDataTestDir() { 417 Path testPath = super.setupDataTestDir(); 418 if (null == testPath) { 419 return null; 420 } 421 422 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 423 424 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 425 // we want our own value to ensure uniqueness on the same machine 426 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 427 428 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 429 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 430 return testPath; 431 } 432 433 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 434 435 String sysValue = System.getProperty(propertyName); 436 437 if (sysValue != null) { 438 // There is already a value set. So we do nothing but hope 439 // that there will be no conflicts 440 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 441 + " so I do NOT create it in " + parent); 442 String confValue = conf.get(propertyName); 443 if (confValue != null && !confValue.endsWith(sysValue)) { 444 LOG.warn(propertyName + " property value differs in configuration and system: " 445 + "Configuration=" + confValue + " while System=" + sysValue 446 + " Erasing configuration value by system value."); 447 } 448 conf.set(propertyName, sysValue); 449 } else { 450 // Ok, it's not set, so we create it as a subdirectory 451 createSubDir(propertyName, parent, subDirName); 452 System.setProperty(propertyName, conf.get(propertyName)); 453 } 454 } 455 456 /** 457 * @return Where to write test data on the test filesystem; Returns working directory for the test 458 * filesystem by default 459 * @see #setupDataTestDirOnTestFS() 460 * @see #getTestFileSystem() 461 */ 462 private Path getBaseTestDirOnTestFS() throws IOException { 463 FileSystem fs = getTestFileSystem(); 464 return new Path(fs.getWorkingDirectory(), "test-data"); 465 } 466 467 /** 468 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 469 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 470 * on it. 471 * @return a unique path in the test filesystem 472 */ 473 public Path getDataTestDirOnTestFS() throws IOException { 474 if (dataTestDirOnTestFS == null) { 475 setupDataTestDirOnTestFS(); 476 } 477 478 return dataTestDirOnTestFS; 479 } 480 481 /** 482 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 483 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 484 * on it. 485 * @return a unique path in the test filesystem 486 * @param subdirName name of the subdir to create under the base test dir 487 */ 488 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 489 return new Path(getDataTestDirOnTestFS(), subdirName); 490 } 491 492 /** 493 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 494 * setup. 495 */ 496 private void setupDataTestDirOnTestFS() throws IOException { 497 if (dataTestDirOnTestFS != null) { 498 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 499 return; 500 } 501 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 502 } 503 504 /** 505 * Sets up a new path in test filesystem to be used by tests. 506 */ 507 private Path getNewDataTestDirOnTestFS() throws IOException { 508 // The file system can be either local, mini dfs, or if the configuration 509 // is supplied externally, it can be an external cluster FS. If it is a local 510 // file system, the tests should use getBaseTestDir, otherwise, we can use 511 // the working directory, and create a unique sub dir there 512 FileSystem fs = getTestFileSystem(); 513 Path newDataTestDir; 514 String randomStr = getRandomUUID().toString(); 515 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 516 newDataTestDir = new Path(getDataTestDir(), randomStr); 517 File dataTestDir = new File(newDataTestDir.toString()); 518 if (deleteOnExit()) dataTestDir.deleteOnExit(); 519 } else { 520 Path base = getBaseTestDirOnTestFS(); 521 newDataTestDir = new Path(base, randomStr); 522 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 523 } 524 return newDataTestDir; 525 } 526 527 /** 528 * Cleans the test data directory on the test filesystem. 529 * @return True if we removed the test dirs 530 */ 531 public boolean cleanupDataTestDirOnTestFS() throws IOException { 532 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 533 if (ret) { 534 dataTestDirOnTestFS = null; 535 } 536 return ret; 537 } 538 539 /** 540 * Cleans a subdirectory under the test data directory on the test filesystem. 541 * @return True if we removed child 542 */ 543 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 544 Path cpath = getDataTestDirOnTestFS(subdirName); 545 return getTestFileSystem().delete(cpath, true); 546 } 547 548 /** 549 * Start a minidfscluster. 550 * @param servers How many DNs to start. 551 * @see #shutdownMiniDFSCluster() 552 * @return The mini dfs cluster created. 553 */ 554 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 555 return startMiniDFSCluster(servers, null); 556 } 557 558 /** 559 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 560 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 561 * instances of the datanodes will have the same host name. 562 * @param hosts hostnames DNs to run on. 563 * @see #shutdownMiniDFSCluster() 564 * @return The mini dfs cluster created. 565 */ 566 public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception { 567 if (hosts != null && hosts.length != 0) { 568 return startMiniDFSCluster(hosts.length, hosts); 569 } else { 570 return startMiniDFSCluster(1, null); 571 } 572 } 573 574 /** 575 * Start a minidfscluster. Can only create one. 576 * @param servers How many DNs to start. 577 * @param hosts hostnames DNs to run on. 578 * @see #shutdownMiniDFSCluster() 579 * @return The mini dfs cluster created. 580 */ 581 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception { 582 return startMiniDFSCluster(servers, null, hosts); 583 } 584 585 private void setFs() throws IOException { 586 if (this.dfsCluster == null) { 587 LOG.info("Skipping setting fs because dfsCluster is null"); 588 return; 589 } 590 FileSystem fs = this.dfsCluster.getFileSystem(); 591 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 592 593 // re-enable this check with dfs 594 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 595 } 596 597 // Workaround to avoid IllegalThreadStateException 598 // See HBASE-27148 for more details 599 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 600 601 private volatile boolean stopped = false; 602 603 private final MiniDFSCluster cluster; 604 605 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 606 super("FsDatasetAsyncDiskServiceFixer"); 607 setDaemon(true); 608 this.cluster = cluster; 609 } 610 611 @Override 612 public void run() { 613 while (!stopped) { 614 try { 615 Thread.sleep(30000); 616 } catch (InterruptedException e) { 617 Thread.currentThread().interrupt(); 618 continue; 619 } 620 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 621 // timeout of the thread pool executor is 60 seconds by default. 622 try { 623 for (DataNode dn : cluster.getDataNodes()) { 624 FsDatasetSpi<?> dataset = dn.getFSDataset(); 625 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 626 service.setAccessible(true); 627 Object asyncDiskService = service.get(dataset); 628 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 629 group.setAccessible(true); 630 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 631 if (threadGroup.isDaemon()) { 632 threadGroup.setDaemon(false); 633 } 634 } 635 } catch (NoSuchFieldException e) { 636 LOG.debug("NoSuchFieldException: " + e.getMessage() 637 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 638 + "See HBASE-27595 for details."); 639 } catch (Exception e) { 640 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 641 } 642 } 643 } 644 645 void shutdown() { 646 stopped = true; 647 interrupt(); 648 } 649 } 650 651 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts) 652 throws Exception { 653 createDirsAndSetProperties(); 654 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 655 656 this.dfsCluster = 657 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 658 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 659 this.dfsClusterFixer.start(); 660 // Set this just-started cluster as our filesystem. 661 setFs(); 662 663 // Wait for the cluster to be totally up 664 this.dfsCluster.waitClusterUp(); 665 666 // reset the test directory for test file system 667 dataTestDirOnTestFS = null; 668 String dataTestDir = getDataTestDir().toString(); 669 conf.set(HConstants.HBASE_DIR, dataTestDir); 670 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 671 672 return this.dfsCluster; 673 } 674 675 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 676 createDirsAndSetProperties(); 677 dfsCluster = 678 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 679 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 680 this.dfsClusterFixer.start(); 681 return dfsCluster; 682 } 683 684 /** 685 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 686 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 687 * the conf. 688 * 689 * <pre> 690 * Configuration conf = TEST_UTIL.getConfiguration(); 691 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 692 * Map.Entry<String, String> e = i.next(); 693 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 694 * } 695 * </pre> 696 */ 697 private void createDirsAndSetProperties() throws IOException { 698 setupClusterTestDir(); 699 conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath()); 700 createDirAndSetProperty("test.cache.data"); 701 createDirAndSetProperty("hadoop.tmp.dir"); 702 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 703 createDirAndSetProperty("mapreduce.cluster.local.dir"); 704 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 705 enableShortCircuit(); 706 707 Path root = getDataTestDirOnTestFS("hadoop"); 708 conf.set(MapreduceTestingShim.getMROutputDirProp(), 709 new Path(root, "mapred-output-dir").toString()); 710 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 711 conf.set("mapreduce.jobtracker.staging.root.dir", 712 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 713 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 714 conf.set("yarn.app.mapreduce.am.staging-dir", 715 new Path(root, "mapreduce-am-staging-root-dir").toString()); 716 717 // Frustrate yarn's and hdfs's attempts at writing /tmp. 718 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 719 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 720 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 721 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 722 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 723 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 724 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 725 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 726 createDirAndSetProperty("dfs.journalnode.edits.dir"); 727 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 728 createDirAndSetProperty("nfs.dump.dir"); 729 createDirAndSetProperty("java.io.tmpdir"); 730 createDirAndSetProperty("dfs.journalnode.edits.dir"); 731 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 732 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 733 734 // disable metrics logger since it depend on commons-logging internal classes and we do not want 735 // commons-logging on our classpath 736 conf.setInt(DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0); 737 conf.setInt(DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0); 738 } 739 740 /** 741 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 742 * Default to false. 743 */ 744 public boolean isNewVersionBehaviorEnabled() { 745 final String propName = "hbase.tests.new.version.behavior"; 746 String v = System.getProperty(propName); 747 if (v != null) { 748 return Boolean.parseBoolean(v); 749 } 750 return false; 751 } 752 753 /** 754 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 755 * allows to specify this parameter on the command line. If not set, default is true. 756 */ 757 public boolean isReadShortCircuitOn() { 758 final String propName = "hbase.tests.use.shortcircuit.reads"; 759 String readOnProp = System.getProperty(propName); 760 if (readOnProp != null) { 761 return Boolean.parseBoolean(readOnProp); 762 } else { 763 return conf.getBoolean(propName, false); 764 } 765 } 766 767 /** 768 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 769 * including skipping the hdfs checksum checks. 770 */ 771 private void enableShortCircuit() { 772 if (isReadShortCircuitOn()) { 773 String curUser = System.getProperty("user.name"); 774 LOG.info("read short circuit is ON for user " + curUser); 775 // read short circuit, for hdfs 776 conf.set("dfs.block.local-path-access.user", curUser); 777 // read short circuit, for hbase 778 conf.setBoolean("dfs.client.read.shortcircuit", true); 779 // Skip checking checksum, for the hdfs client and the datanode 780 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 781 } else { 782 LOG.info("read short circuit is OFF"); 783 } 784 } 785 786 private String createDirAndSetProperty(final String property) { 787 return createDirAndSetProperty(property, property); 788 } 789 790 private String createDirAndSetProperty(final String relPath, String property) { 791 String path = getDataTestDir(relPath).toString(); 792 System.setProperty(property, path); 793 conf.set(property, path); 794 new File(path).mkdirs(); 795 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 796 return path; 797 } 798 799 /** 800 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 801 */ 802 public void shutdownMiniDFSCluster() throws IOException { 803 if (this.dfsCluster != null) { 804 // The below throws an exception per dn, AsynchronousCloseException. 805 this.dfsCluster.shutdown(); 806 dfsCluster = null; 807 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 808 // have a fixer 809 if (dfsClusterFixer != null) { 810 this.dfsClusterFixer.shutdown(); 811 dfsClusterFixer = null; 812 } 813 dataTestDirOnTestFS = null; 814 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 815 } 816 } 817 818 /** 819 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 820 * other options will use default values, defined in {@link StartTestingClusterOption.Builder}. 821 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 822 * @see #startMiniCluster(StartTestingClusterOption option) 823 * @see #shutdownMiniDFSCluster() 824 */ 825 public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception { 826 StartTestingClusterOption option = StartTestingClusterOption.builder() 827 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 828 return startMiniCluster(option); 829 } 830 831 /** 832 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 833 * value can be found in {@link StartTestingClusterOption.Builder}. 834 * @see #startMiniCluster(StartTestingClusterOption option) 835 * @see #shutdownMiniDFSCluster() 836 */ 837 public SingleProcessHBaseCluster startMiniCluster() throws Exception { 838 return startMiniCluster(StartTestingClusterOption.builder().build()); 839 } 840 841 /** 842 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 843 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 844 * under System property test.build.data, to be cleaned up on exit. 845 * @see #shutdownMiniDFSCluster() 846 */ 847 public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option) 848 throws Exception { 849 LOG.info("Starting up minicluster with option: {}", option); 850 851 // If we already put up a cluster, fail. 852 if (miniClusterRunning) { 853 throw new IllegalStateException("A mini-cluster is already running"); 854 } 855 miniClusterRunning = true; 856 857 setupClusterTestDir(); 858 859 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 860 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 861 if (dfsCluster == null) { 862 LOG.info("STARTING DFS"); 863 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 864 } else { 865 LOG.info("NOT STARTING DFS"); 866 } 867 868 // Start up a zk cluster. 869 if (getZkCluster() == null) { 870 startMiniZKCluster(option.getNumZkServers()); 871 } 872 873 // Start the MiniHBaseCluster 874 return startMiniHBaseCluster(option); 875 } 876 877 /** 878 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 879 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 880 * @return Reference to the hbase mini hbase cluster. 881 * @see #startMiniCluster(StartTestingClusterOption) 882 * @see #shutdownMiniHBaseCluster() 883 */ 884 public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option) 885 throws IOException, InterruptedException { 886 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 887 createRootDir(option.isCreateRootDir()); 888 if (option.isCreateWALDir()) { 889 createWALRootDir(); 890 } 891 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 892 // for tests that do not read hbase-defaults.xml 893 setHBaseFsTmpDir(); 894 895 // These settings will make the server waits until this exact number of 896 // regions servers are connected. 897 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 898 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 899 } 900 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 901 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 902 } 903 904 Configuration c = new Configuration(this.conf); 905 this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(), 906 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 907 option.getMasterClass(), option.getRsClass()); 908 // Populate the master address configuration from mini cluster configuration. 909 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 910 // Don't leave here till we've done a successful scan of the hbase:meta 911 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 912 ResultScanner s = t.getScanner(new Scan())) { 913 for (;;) { 914 if (s.next() == null) { 915 break; 916 } 917 } 918 } 919 920 getAdmin(); // create immediately the hbaseAdmin 921 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 922 923 return (SingleProcessHBaseCluster) hbaseCluster; 924 } 925 926 /** 927 * Starts up mini hbase cluster using default options. Default options can be found in 928 * {@link StartTestingClusterOption.Builder}. 929 * @see #startMiniHBaseCluster(StartTestingClusterOption) 930 * @see #shutdownMiniHBaseCluster() 931 */ 932 public SingleProcessHBaseCluster startMiniHBaseCluster() 933 throws IOException, InterruptedException { 934 return startMiniHBaseCluster(StartTestingClusterOption.builder().build()); 935 } 936 937 /** 938 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 939 * {@link #startMiniCluster()}. All other options will use default values, defined in 940 * {@link StartTestingClusterOption.Builder}. 941 * @param numMasters Master node number. 942 * @param numRegionServers Number of region servers. 943 * @return The mini HBase cluster created. 944 * @see #shutdownMiniHBaseCluster() 945 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 946 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 947 * @see #startMiniHBaseCluster(StartTestingClusterOption) 948 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 949 */ 950 @Deprecated 951 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 952 throws IOException, InterruptedException { 953 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 954 .numRegionServers(numRegionServers).build(); 955 return startMiniHBaseCluster(option); 956 } 957 958 /** 959 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 960 * {@link #startMiniCluster()}. All other options will use default values, defined in 961 * {@link StartTestingClusterOption.Builder}. 962 * @param numMasters Master node number. 963 * @param numRegionServers Number of region servers. 964 * @param rsPorts Ports that RegionServer should use. 965 * @return The mini HBase cluster created. 966 * @see #shutdownMiniHBaseCluster() 967 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 968 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 969 * @see #startMiniHBaseCluster(StartTestingClusterOption) 970 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 971 */ 972 @Deprecated 973 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 974 List<Integer> rsPorts) throws IOException, InterruptedException { 975 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 976 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 977 return startMiniHBaseCluster(option); 978 } 979 980 /** 981 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 982 * {@link #startMiniCluster()}. All other options will use default values, defined in 983 * {@link StartTestingClusterOption.Builder}. 984 * @param numMasters Master node number. 985 * @param numRegionServers Number of region servers. 986 * @param rsPorts Ports that RegionServer should use. 987 * @param masterClass The class to use as HMaster, or null for default. 988 * @param rsClass The class to use as HRegionServer, or null for default. 989 * @param createRootDir Whether to create a new root or data directory path. 990 * @param createWALDir Whether to create a new WAL directory. 991 * @return The mini HBase cluster created. 992 * @see #shutdownMiniHBaseCluster() 993 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 994 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 995 * @see #startMiniHBaseCluster(StartTestingClusterOption) 996 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 997 */ 998 @Deprecated 999 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 1000 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 1001 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass, 1002 boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException { 1003 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 1004 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 1005 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 1006 return startMiniHBaseCluster(option); 1007 } 1008 1009 /** 1010 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 1011 * want to keep dfs/zk up and just stop/start hbase. 1012 * @param servers number of region servers 1013 */ 1014 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 1015 this.restartHBaseCluster(servers, null); 1016 } 1017 1018 public void restartHBaseCluster(int servers, List<Integer> ports) 1019 throws IOException, InterruptedException { 1020 StartTestingClusterOption option = 1021 StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 1022 restartHBaseCluster(option); 1023 invalidateConnection(); 1024 } 1025 1026 public void restartHBaseCluster(StartTestingClusterOption option) 1027 throws IOException, InterruptedException { 1028 closeConnection(); 1029 this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(), 1030 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1031 option.getMasterClass(), option.getRsClass()); 1032 // Don't leave here till we've done a successful scan of the hbase:meta 1033 Connection conn = ConnectionFactory.createConnection(this.conf); 1034 Table t = conn.getTable(TableName.META_TABLE_NAME); 1035 ResultScanner s = t.getScanner(new Scan()); 1036 while (s.next() != null) { 1037 // do nothing 1038 } 1039 LOG.info("HBase has been restarted"); 1040 s.close(); 1041 t.close(); 1042 conn.close(); 1043 } 1044 1045 /** 1046 * Returns current mini hbase cluster. Only has something in it after a call to 1047 * {@link #startMiniCluster()}. 1048 * @see #startMiniCluster() 1049 */ 1050 public SingleProcessHBaseCluster getMiniHBaseCluster() { 1051 if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) { 1052 return (SingleProcessHBaseCluster) this.hbaseCluster; 1053 } 1054 throw new RuntimeException( 1055 hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName()); 1056 } 1057 1058 /** 1059 * Stops mini hbase, zk, and hdfs clusters. 1060 * @see #startMiniCluster(int) 1061 */ 1062 public void shutdownMiniCluster() throws IOException { 1063 LOG.info("Shutting down minicluster"); 1064 shutdownMiniHBaseCluster(); 1065 shutdownMiniDFSCluster(); 1066 shutdownMiniZKCluster(); 1067 1068 cleanupTestDir(); 1069 miniClusterRunning = false; 1070 LOG.info("Minicluster is down"); 1071 } 1072 1073 /** 1074 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1075 * @throws java.io.IOException in case command is unsuccessful 1076 */ 1077 public void shutdownMiniHBaseCluster() throws IOException { 1078 cleanup(); 1079 if (this.hbaseCluster != null) { 1080 this.hbaseCluster.shutdown(); 1081 // Wait till hbase is down before going on to shutdown zk. 1082 this.hbaseCluster.waitUntilShutDown(); 1083 this.hbaseCluster = null; 1084 } 1085 if (zooKeeperWatcher != null) { 1086 zooKeeperWatcher.close(); 1087 zooKeeperWatcher = null; 1088 } 1089 } 1090 1091 /** 1092 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1093 * @throws java.io.IOException throws in case command is unsuccessful 1094 */ 1095 public void killMiniHBaseCluster() throws IOException { 1096 cleanup(); 1097 if (this.hbaseCluster != null) { 1098 getMiniHBaseCluster().killAll(); 1099 this.hbaseCluster = null; 1100 } 1101 if (zooKeeperWatcher != null) { 1102 zooKeeperWatcher.close(); 1103 zooKeeperWatcher = null; 1104 } 1105 } 1106 1107 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1108 private void cleanup() throws IOException { 1109 closeConnection(); 1110 // unset the configuration for MIN and MAX RS to start 1111 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1112 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1113 } 1114 1115 /** 1116 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1117 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1118 * If false, previous path is used. Note: this does not cause the root dir to be created. 1119 * @return Fully qualified path for the default hbase root dir 1120 */ 1121 public Path getDefaultRootDirPath(boolean create) throws IOException { 1122 if (!create) { 1123 return getDataTestDirOnTestFS(); 1124 } else { 1125 return getNewDataTestDirOnTestFS(); 1126 } 1127 } 1128 1129 /** 1130 * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that 1131 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1132 * @return Fully qualified path for the default hbase root dir 1133 */ 1134 public Path getDefaultRootDirPath() throws IOException { 1135 return getDefaultRootDirPath(false); 1136 } 1137 1138 /** 1139 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1140 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1141 * startup. You'd only use this method if you were doing manual operation. 1142 * @param create This flag decides whether to get a new root or data directory path or not, if it 1143 * has been fetched already. Note : Directory will be made irrespective of whether 1144 * path has been fetched or not. If directory already exists, it will be overwritten 1145 * @return Fully qualified path to hbase root dir 1146 */ 1147 public Path createRootDir(boolean create) throws IOException { 1148 FileSystem fs = FileSystem.get(this.conf); 1149 Path hbaseRootdir = getDefaultRootDirPath(create); 1150 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1151 fs.mkdirs(hbaseRootdir); 1152 FSUtils.setVersion(fs, hbaseRootdir); 1153 return hbaseRootdir; 1154 } 1155 1156 /** 1157 * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code> 1158 * flag is false. 1159 * @return Fully qualified path to hbase root dir 1160 */ 1161 public Path createRootDir() throws IOException { 1162 return createRootDir(false); 1163 } 1164 1165 /** 1166 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1167 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1168 * this method if you were doing manual operation. 1169 * @return Fully qualified path to hbase root dir 1170 */ 1171 public Path createWALRootDir() throws IOException { 1172 FileSystem fs = FileSystem.get(this.conf); 1173 Path walDir = getNewDataTestDirOnTestFS(); 1174 CommonFSUtils.setWALRootDir(this.conf, walDir); 1175 fs.mkdirs(walDir); 1176 return walDir; 1177 } 1178 1179 private void setHBaseFsTmpDir() throws IOException { 1180 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1181 if (hbaseFsTmpDirInString == null) { 1182 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1183 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1184 } else { 1185 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1186 } 1187 } 1188 1189 /** 1190 * Flushes all caches in the mini hbase cluster 1191 */ 1192 public void flush() throws IOException { 1193 getMiniHBaseCluster().flushcache(); 1194 } 1195 1196 /** 1197 * Flushes all caches in the mini hbase cluster 1198 */ 1199 public void flush(TableName tableName) throws IOException { 1200 getMiniHBaseCluster().flushcache(tableName); 1201 } 1202 1203 /** 1204 * Compact all regions in the mini hbase cluster 1205 */ 1206 public void compact(boolean major) throws IOException { 1207 getMiniHBaseCluster().compact(major); 1208 } 1209 1210 /** 1211 * Compact all of a table's reagion in the mini hbase cluster 1212 */ 1213 public void compact(TableName tableName, boolean major) throws IOException { 1214 getMiniHBaseCluster().compact(tableName, major); 1215 } 1216 1217 /** 1218 * Create a table. 1219 * @return A Table instance for the created table. 1220 */ 1221 public Table createTable(TableName tableName, String family) throws IOException { 1222 return createTable(tableName, new String[] { family }); 1223 } 1224 1225 /** 1226 * Create a table. 1227 * @return A Table instance for the created table. 1228 */ 1229 public Table createTable(TableName tableName, String[] families) throws IOException { 1230 List<byte[]> fams = new ArrayList<>(families.length); 1231 for (String family : families) { 1232 fams.add(Bytes.toBytes(family)); 1233 } 1234 return createTable(tableName, fams.toArray(new byte[0][])); 1235 } 1236 1237 /** 1238 * Create a table. 1239 * @return A Table instance for the created table. 1240 */ 1241 public Table createTable(TableName tableName, byte[] family) throws IOException { 1242 return createTable(tableName, new byte[][] { family }); 1243 } 1244 1245 /** 1246 * Create a table with multiple regions. 1247 * @return A Table instance for the created table. 1248 */ 1249 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1250 throws IOException { 1251 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1252 byte[] startKey = Bytes.toBytes("aaaaa"); 1253 byte[] endKey = Bytes.toBytes("zzzzz"); 1254 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1255 1256 return createTable(tableName, new byte[][] { family }, splitKeys); 1257 } 1258 1259 /** 1260 * Create a table. 1261 * @return A Table instance for the created table. 1262 */ 1263 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1264 return createTable(tableName, families, (byte[][]) null); 1265 } 1266 1267 /** 1268 * Create a table with multiple regions. 1269 * @return A Table instance for the created table. 1270 */ 1271 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1272 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1273 } 1274 1275 /** 1276 * Create a table with multiple regions. 1277 * @param replicaCount replica count. 1278 * @return A Table instance for the created table. 1279 */ 1280 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1281 throws IOException { 1282 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1283 } 1284 1285 /** 1286 * Create a table. 1287 * @return A Table instance for the created table. 1288 */ 1289 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1290 throws IOException { 1291 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1292 } 1293 1294 /** 1295 * Create a table. 1296 * @param tableName the table name 1297 * @param families the families 1298 * @param splitKeys the splitkeys 1299 * @param replicaCount the region replica count 1300 * @return A Table instance for the created table. 1301 * @throws IOException throws IOException 1302 */ 1303 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1304 int replicaCount) throws IOException { 1305 return createTable(tableName, families, splitKeys, replicaCount, 1306 new Configuration(getConfiguration())); 1307 } 1308 1309 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1310 byte[] endKey, int numRegions) throws IOException { 1311 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1312 1313 getAdmin().createTable(desc, startKey, endKey, numRegions); 1314 // HBaseAdmin only waits for regions to appear in hbase:meta we 1315 // should wait until they are assigned 1316 waitUntilAllRegionsAssigned(tableName); 1317 return getConnection().getTable(tableName); 1318 } 1319 1320 /** 1321 * Create a table. 1322 * @param c Configuration to use 1323 * @return A Table instance for the created table. 1324 */ 1325 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1326 throws IOException { 1327 return createTable(htd, families, null, c); 1328 } 1329 1330 /** 1331 * Create a table. 1332 * @param htd table descriptor 1333 * @param families array of column families 1334 * @param splitKeys array of split keys 1335 * @param c Configuration to use 1336 * @return A Table instance for the created table. 1337 * @throws IOException if getAdmin or createTable fails 1338 */ 1339 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1340 Configuration c) throws IOException { 1341 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1342 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1343 // on is interfering. 1344 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1345 } 1346 1347 /** 1348 * Create a table. 1349 * @param htd table descriptor 1350 * @param families array of column families 1351 * @param splitKeys array of split keys 1352 * @param type Bloom type 1353 * @param blockSize block size 1354 * @param c Configuration to use 1355 * @return A Table instance for the created table. 1356 * @throws IOException if getAdmin or createTable fails 1357 */ 1358 1359 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1360 BloomType type, int blockSize, Configuration c) throws IOException { 1361 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1362 for (byte[] family : families) { 1363 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1364 .setBloomFilterType(type).setBlocksize(blockSize); 1365 if (isNewVersionBehaviorEnabled()) { 1366 cfdb.setNewVersionBehavior(true); 1367 } 1368 builder.setColumnFamily(cfdb.build()); 1369 } 1370 TableDescriptor td = builder.build(); 1371 if (splitKeys != null) { 1372 getAdmin().createTable(td, splitKeys); 1373 } else { 1374 getAdmin().createTable(td); 1375 } 1376 // HBaseAdmin only waits for regions to appear in hbase:meta 1377 // we should wait until they are assigned 1378 waitUntilAllRegionsAssigned(td.getTableName()); 1379 return getConnection().getTable(td.getTableName()); 1380 } 1381 1382 /** 1383 * Create a table. 1384 * @param htd table descriptor 1385 * @param splitRows array of split keys 1386 * @return A Table instance for the created table. 1387 */ 1388 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1389 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1390 if (isNewVersionBehaviorEnabled()) { 1391 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1392 builder.setColumnFamily( 1393 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1394 } 1395 } 1396 if (splitRows != null) { 1397 getAdmin().createTable(builder.build(), splitRows); 1398 } else { 1399 getAdmin().createTable(builder.build()); 1400 } 1401 // HBaseAdmin only waits for regions to appear in hbase:meta 1402 // we should wait until they are assigned 1403 waitUntilAllRegionsAssigned(htd.getTableName()); 1404 return getConnection().getTable(htd.getTableName()); 1405 } 1406 1407 /** 1408 * Create a table. 1409 * @param tableName the table name 1410 * @param families the families 1411 * @param splitKeys the split keys 1412 * @param replicaCount the replica count 1413 * @param c Configuration to use 1414 * @return A Table instance for the created table. 1415 */ 1416 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1417 int replicaCount, final Configuration c) throws IOException { 1418 TableDescriptor htd = 1419 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1420 return createTable(htd, families, splitKeys, c); 1421 } 1422 1423 /** 1424 * Create a table. 1425 * @return A Table instance for the created table. 1426 */ 1427 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1428 return createTable(tableName, new byte[][] { family }, numVersions); 1429 } 1430 1431 /** 1432 * Create a table. 1433 * @return A Table instance for the created table. 1434 */ 1435 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1436 throws IOException { 1437 return createTable(tableName, families, numVersions, (byte[][]) null); 1438 } 1439 1440 /** 1441 * Create a table. 1442 * @return A Table instance for the created table. 1443 */ 1444 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1445 byte[][] splitKeys) throws IOException { 1446 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1447 for (byte[] family : families) { 1448 ColumnFamilyDescriptorBuilder cfBuilder = 1449 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1450 if (isNewVersionBehaviorEnabled()) { 1451 cfBuilder.setNewVersionBehavior(true); 1452 } 1453 builder.setColumnFamily(cfBuilder.build()); 1454 } 1455 if (splitKeys != null) { 1456 getAdmin().createTable(builder.build(), splitKeys); 1457 } else { 1458 getAdmin().createTable(builder.build()); 1459 } 1460 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1461 // assigned 1462 waitUntilAllRegionsAssigned(tableName); 1463 return getConnection().getTable(tableName); 1464 } 1465 1466 /** 1467 * Create a table with multiple regions. 1468 * @return A Table instance for the created table. 1469 */ 1470 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1471 throws IOException { 1472 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1473 } 1474 1475 /** 1476 * Create a table. 1477 * @return A Table instance for the created table. 1478 */ 1479 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1480 throws IOException { 1481 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1482 for (byte[] family : families) { 1483 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1484 .setMaxVersions(numVersions).setBlocksize(blockSize); 1485 if (isNewVersionBehaviorEnabled()) { 1486 cfBuilder.setNewVersionBehavior(true); 1487 } 1488 builder.setColumnFamily(cfBuilder.build()); 1489 } 1490 getAdmin().createTable(builder.build()); 1491 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1492 // assigned 1493 waitUntilAllRegionsAssigned(tableName); 1494 return getConnection().getTable(tableName); 1495 } 1496 1497 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1498 String cpName) throws IOException { 1499 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1500 for (byte[] family : families) { 1501 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1502 .setMaxVersions(numVersions).setBlocksize(blockSize); 1503 if (isNewVersionBehaviorEnabled()) { 1504 cfBuilder.setNewVersionBehavior(true); 1505 } 1506 builder.setColumnFamily(cfBuilder.build()); 1507 } 1508 if (cpName != null) { 1509 builder.setCoprocessor(cpName); 1510 } 1511 getAdmin().createTable(builder.build()); 1512 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1513 // assigned 1514 waitUntilAllRegionsAssigned(tableName); 1515 return getConnection().getTable(tableName); 1516 } 1517 1518 /** 1519 * Create a table. 1520 * @return A Table instance for the created table. 1521 */ 1522 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1523 throws IOException { 1524 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1525 int i = 0; 1526 for (byte[] family : families) { 1527 ColumnFamilyDescriptorBuilder cfBuilder = 1528 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1529 if (isNewVersionBehaviorEnabled()) { 1530 cfBuilder.setNewVersionBehavior(true); 1531 } 1532 builder.setColumnFamily(cfBuilder.build()); 1533 i++; 1534 } 1535 getAdmin().createTable(builder.build()); 1536 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1537 // assigned 1538 waitUntilAllRegionsAssigned(tableName); 1539 return getConnection().getTable(tableName); 1540 } 1541 1542 /** 1543 * Create a table. 1544 * @return A Table instance for the created table. 1545 */ 1546 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1547 throws IOException { 1548 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1549 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1550 if (isNewVersionBehaviorEnabled()) { 1551 cfBuilder.setNewVersionBehavior(true); 1552 } 1553 builder.setColumnFamily(cfBuilder.build()); 1554 getAdmin().createTable(builder.build(), splitRows); 1555 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1556 // assigned 1557 waitUntilAllRegionsAssigned(tableName); 1558 return getConnection().getTable(tableName); 1559 } 1560 1561 /** 1562 * Create a table with multiple regions. 1563 * @return A Table instance for the created table. 1564 */ 1565 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1566 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1567 } 1568 1569 /** 1570 * Set the number of Region replicas. 1571 */ 1572 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1573 throws IOException, InterruptedException { 1574 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1575 .setRegionReplication(replicaCount).build(); 1576 admin.modifyTable(desc); 1577 } 1578 1579 /** 1580 * Set the number of Region replicas. 1581 */ 1582 public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount) 1583 throws ExecutionException, IOException, InterruptedException { 1584 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get()) 1585 .setRegionReplication(replicaCount).build(); 1586 admin.modifyTable(desc).get(); 1587 } 1588 1589 /** 1590 * Drop an existing table 1591 * @param tableName existing table 1592 */ 1593 public void deleteTable(TableName tableName) throws IOException { 1594 try { 1595 getAdmin().disableTable(tableName); 1596 } catch (TableNotEnabledException e) { 1597 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1598 } 1599 getAdmin().deleteTable(tableName); 1600 } 1601 1602 /** 1603 * Drop an existing table 1604 * @param tableName existing table 1605 */ 1606 public void deleteTableIfAny(TableName tableName) throws IOException { 1607 try { 1608 deleteTable(tableName); 1609 } catch (TableNotFoundException e) { 1610 // ignore 1611 } 1612 } 1613 1614 // ========================================================================== 1615 // Canned table and table descriptor creation 1616 1617 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1618 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1619 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1620 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1621 private static final int MAXVERSIONS = 3; 1622 1623 public static final char FIRST_CHAR = 'a'; 1624 public static final char LAST_CHAR = 'z'; 1625 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1626 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1627 1628 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1629 return createModifyableTableDescriptor(TableName.valueOf(name), 1630 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1631 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1632 } 1633 1634 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1635 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1636 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1637 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1638 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1639 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1640 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1641 if (isNewVersionBehaviorEnabled()) { 1642 cfBuilder.setNewVersionBehavior(true); 1643 } 1644 builder.setColumnFamily(cfBuilder.build()); 1645 } 1646 return builder.build(); 1647 } 1648 1649 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1650 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1651 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1652 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1653 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1654 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1655 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1656 if (isNewVersionBehaviorEnabled()) { 1657 cfBuilder.setNewVersionBehavior(true); 1658 } 1659 builder.setColumnFamily(cfBuilder.build()); 1660 } 1661 return builder; 1662 } 1663 1664 /** 1665 * Create a table of name <code>name</code>. 1666 * @param name Name to give table. 1667 * @return Column descriptor. 1668 */ 1669 public TableDescriptor createTableDescriptor(final TableName name) { 1670 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1671 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1672 } 1673 1674 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1675 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1676 } 1677 1678 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1679 int maxVersions) { 1680 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1681 for (byte[] family : families) { 1682 ColumnFamilyDescriptorBuilder cfBuilder = 1683 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1684 if (isNewVersionBehaviorEnabled()) { 1685 cfBuilder.setNewVersionBehavior(true); 1686 } 1687 builder.setColumnFamily(cfBuilder.build()); 1688 } 1689 return builder.build(); 1690 } 1691 1692 /** 1693 * Create an HRegion that writes to the local tmp dirs 1694 * @param desc a table descriptor indicating which table the region belongs to 1695 * @param startKey the start boundary of the region 1696 * @param endKey the end boundary of the region 1697 * @return a region that writes to local dir for testing 1698 */ 1699 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1700 throws IOException { 1701 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1702 .setEndKey(endKey).build(); 1703 return createLocalHRegion(hri, desc); 1704 } 1705 1706 /** 1707 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1708 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it. 1709 */ 1710 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1711 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1712 } 1713 1714 /** 1715 * Create an HRegion that writes to the local tmp dirs with specified wal 1716 * @param info regioninfo 1717 * @param conf configuration 1718 * @param desc table descriptor 1719 * @param wal wal for this region. 1720 * @return created hregion 1721 */ 1722 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1723 WAL wal) throws IOException { 1724 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 1725 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1726 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1727 } 1728 1729 /** 1730 * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} 1731 * when done. 1732 */ 1733 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1734 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1735 throws IOException { 1736 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1737 durability, wal, null, families); 1738 } 1739 1740 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1741 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1742 boolean[] compactedMemStore, byte[]... families) throws IOException { 1743 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1744 builder.setReadOnly(isReadOnly); 1745 int i = 0; 1746 for (byte[] family : families) { 1747 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1748 if (compactedMemStore != null && i < compactedMemStore.length) { 1749 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1750 } else { 1751 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1752 1753 } 1754 i++; 1755 // Set default to be three versions. 1756 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1757 builder.setColumnFamily(cfBuilder.build()); 1758 } 1759 builder.setDurability(durability); 1760 RegionInfo info = 1761 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1762 return createLocalHRegion(info, conf, builder.build(), wal); 1763 } 1764 1765 // 1766 // ========================================================================== 1767 1768 /** 1769 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1770 * read. 1771 * @param tableName existing table 1772 * @return HTable to that new table 1773 */ 1774 public Table deleteTableData(TableName tableName) throws IOException { 1775 Table table = getConnection().getTable(tableName); 1776 Scan scan = new Scan(); 1777 ResultScanner resScan = table.getScanner(scan); 1778 for (Result res : resScan) { 1779 Delete del = new Delete(res.getRow()); 1780 table.delete(del); 1781 } 1782 resScan = table.getScanner(scan); 1783 resScan.close(); 1784 return table; 1785 } 1786 1787 /** 1788 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1789 * table. 1790 * @param tableName table which must exist. 1791 * @param preserveRegions keep the existing split points 1792 * @return HTable for the new table 1793 */ 1794 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 1795 throws IOException { 1796 Admin admin = getAdmin(); 1797 if (!admin.isTableDisabled(tableName)) { 1798 admin.disableTable(tableName); 1799 } 1800 admin.truncateTable(tableName, preserveRegions); 1801 return getConnection().getTable(tableName); 1802 } 1803 1804 /** 1805 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1806 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 1807 * preserve regions of existing table. 1808 * @param tableName table which must exist. 1809 * @return HTable for the new table 1810 */ 1811 public Table truncateTable(final TableName tableName) throws IOException { 1812 return truncateTable(tableName, false); 1813 } 1814 1815 /** 1816 * Load table with rows from 'aaa' to 'zzz'. 1817 * @param t Table 1818 * @param f Family 1819 * @return Count of rows loaded. 1820 */ 1821 public int loadTable(final Table t, final byte[] f) throws IOException { 1822 return loadTable(t, new byte[][] { f }); 1823 } 1824 1825 /** 1826 * Load table with rows from 'aaa' to 'zzz'. 1827 * @param t Table 1828 * @param f Family 1829 * @return Count of rows loaded. 1830 */ 1831 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 1832 return loadTable(t, new byte[][] { f }, null, writeToWAL); 1833 } 1834 1835 /** 1836 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1837 * @param t Table 1838 * @param f Array of Families to load 1839 * @return Count of rows loaded. 1840 */ 1841 public int loadTable(final Table t, final byte[][] f) throws IOException { 1842 return loadTable(t, f, null); 1843 } 1844 1845 /** 1846 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1847 * @param t Table 1848 * @param f Array of Families to load 1849 * @param value the values of the cells. If null is passed, the row key is used as value 1850 * @return Count of rows loaded. 1851 */ 1852 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 1853 return loadTable(t, f, value, true); 1854 } 1855 1856 /** 1857 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1858 * @param t Table 1859 * @param f Array of Families to load 1860 * @param value the values of the cells. If null is passed, the row key is used as value 1861 * @return Count of rows loaded. 1862 */ 1863 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 1864 throws IOException { 1865 List<Put> puts = new ArrayList<>(); 1866 for (byte[] row : HBaseTestingUtil.ROWS) { 1867 Put put = new Put(row); 1868 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 1869 for (int i = 0; i < f.length; i++) { 1870 byte[] value1 = value != null ? value : row; 1871 put.addColumn(f[i], f[i], value1); 1872 } 1873 puts.add(put); 1874 } 1875 t.put(puts); 1876 return puts.size(); 1877 } 1878 1879 /** 1880 * A tracker for tracking and validating table rows generated with 1881 * {@link HBaseTestingUtil#loadTable(Table, byte[])} 1882 */ 1883 public static class SeenRowTracker { 1884 int dim = 'z' - 'a' + 1; 1885 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 1886 byte[] startRow; 1887 byte[] stopRow; 1888 1889 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 1890 this.startRow = startRow; 1891 this.stopRow = stopRow; 1892 } 1893 1894 void reset() { 1895 for (byte[] row : ROWS) { 1896 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 1897 } 1898 } 1899 1900 int i(byte b) { 1901 return b - 'a'; 1902 } 1903 1904 public void addRow(byte[] row) { 1905 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 1906 } 1907 1908 /** 1909 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 1910 * rows none 1911 */ 1912 public void validate() { 1913 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1914 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1915 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1916 int count = seenRows[i(b1)][i(b2)][i(b3)]; 1917 int expectedCount = 0; 1918 if ( 1919 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 1920 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 1921 ) { 1922 expectedCount = 1; 1923 } 1924 if (count != expectedCount) { 1925 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 1926 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 1927 + "instead of " + expectedCount); 1928 } 1929 } 1930 } 1931 } 1932 } 1933 } 1934 1935 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 1936 return loadRegion(r, f, false); 1937 } 1938 1939 public int loadRegion(final Region r, final byte[] f) throws IOException { 1940 return loadRegion((HRegion) r, f); 1941 } 1942 1943 /** 1944 * Load region with rows from 'aaa' to 'zzz'. 1945 * @param r Region 1946 * @param f Family 1947 * @param flush flush the cache if true 1948 * @return Count of rows loaded. 1949 */ 1950 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 1951 byte[] k = new byte[3]; 1952 int rowCount = 0; 1953 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1954 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1955 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1956 k[0] = b1; 1957 k[1] = b2; 1958 k[2] = b3; 1959 Put put = new Put(k); 1960 put.setDurability(Durability.SKIP_WAL); 1961 put.addColumn(f, null, k); 1962 if (r.getWAL() == null) { 1963 put.setDurability(Durability.SKIP_WAL); 1964 } 1965 int preRowCount = rowCount; 1966 int pause = 10; 1967 int maxPause = 1000; 1968 while (rowCount == preRowCount) { 1969 try { 1970 r.put(put); 1971 rowCount++; 1972 } catch (RegionTooBusyException e) { 1973 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 1974 Threads.sleep(pause); 1975 } 1976 } 1977 } 1978 } 1979 if (flush) { 1980 r.flush(true); 1981 } 1982 } 1983 return rowCount; 1984 } 1985 1986 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 1987 throws IOException { 1988 for (int i = startRow; i < endRow; i++) { 1989 byte[] data = Bytes.toBytes(String.valueOf(i)); 1990 Put put = new Put(data); 1991 put.addColumn(f, null, data); 1992 t.put(put); 1993 } 1994 } 1995 1996 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 1997 throws IOException { 1998 for (int i = 0; i < totalRows; i++) { 1999 byte[] row = new byte[rowSize]; 2000 Bytes.random(row); 2001 Put put = new Put(row); 2002 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 2003 t.put(put); 2004 } 2005 } 2006 2007 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 2008 int replicaId) throws IOException { 2009 for (int i = startRow; i < endRow; i++) { 2010 String failMsg = "Failed verification of row :" + i; 2011 byte[] data = Bytes.toBytes(String.valueOf(i)); 2012 Get get = new Get(data); 2013 get.setReplicaId(replicaId); 2014 get.setConsistency(Consistency.TIMELINE); 2015 Result result = table.get(get); 2016 assertTrue(failMsg, result.containsColumn(f, null)); 2017 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2018 Cell cell = result.getColumnLatestCell(f, null); 2019 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2020 cell.getValueOffset(), cell.getValueLength())); 2021 } 2022 } 2023 2024 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 2025 throws IOException { 2026 verifyNumericRows((HRegion) region, f, startRow, endRow); 2027 } 2028 2029 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 2030 throws IOException { 2031 verifyNumericRows(region, f, startRow, endRow, true); 2032 } 2033 2034 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 2035 final boolean present) throws IOException { 2036 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 2037 } 2038 2039 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 2040 final boolean present) throws IOException { 2041 for (int i = startRow; i < endRow; i++) { 2042 String failMsg = "Failed verification of row :" + i; 2043 byte[] data = Bytes.toBytes(String.valueOf(i)); 2044 Result result = region.get(new Get(data)); 2045 2046 boolean hasResult = result != null && !result.isEmpty(); 2047 assertEquals(failMsg + result, present, hasResult); 2048 if (!present) continue; 2049 2050 assertTrue(failMsg, result.containsColumn(f, null)); 2051 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2052 Cell cell = result.getColumnLatestCell(f, null); 2053 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2054 cell.getValueOffset(), cell.getValueLength())); 2055 } 2056 } 2057 2058 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2059 throws IOException { 2060 for (int i = startRow; i < endRow; i++) { 2061 byte[] data = Bytes.toBytes(String.valueOf(i)); 2062 Delete delete = new Delete(data); 2063 delete.addFamily(f); 2064 t.delete(delete); 2065 } 2066 } 2067 2068 /** 2069 * Return the number of rows in the given table. 2070 * @param table to count rows 2071 * @return count of rows 2072 */ 2073 public static int countRows(final Table table) throws IOException { 2074 return countRows(table, new Scan()); 2075 } 2076 2077 public static int countRows(final Table table, final Scan scan) throws IOException { 2078 try (ResultScanner results = table.getScanner(scan)) { 2079 int count = 0; 2080 while (results.next() != null) { 2081 count++; 2082 } 2083 return count; 2084 } 2085 } 2086 2087 public static int countRows(final Table table, final byte[]... families) throws IOException { 2088 Scan scan = new Scan(); 2089 for (byte[] family : families) { 2090 scan.addFamily(family); 2091 } 2092 return countRows(table, scan); 2093 } 2094 2095 /** 2096 * Return the number of rows in the given table. 2097 */ 2098 public int countRows(final TableName tableName) throws IOException { 2099 try (Table table = getConnection().getTable(tableName)) { 2100 return countRows(table); 2101 } 2102 } 2103 2104 public static int countRows(final Region region) throws IOException { 2105 return countRows(region, new Scan()); 2106 } 2107 2108 public static int countRows(final Region region, final Scan scan) throws IOException { 2109 try (InternalScanner scanner = region.getScanner(scan)) { 2110 return countRows(scanner); 2111 } 2112 } 2113 2114 public static int countRows(final InternalScanner scanner) throws IOException { 2115 int scannedCount = 0; 2116 List<Cell> results = new ArrayList<>(); 2117 boolean hasMore = true; 2118 while (hasMore) { 2119 hasMore = scanner.next(results); 2120 scannedCount += results.size(); 2121 results.clear(); 2122 } 2123 return scannedCount; 2124 } 2125 2126 /** 2127 * Return an md5 digest of the entire contents of a table. 2128 */ 2129 public String checksumRows(final Table table) throws Exception { 2130 MessageDigest digest = MessageDigest.getInstance("MD5"); 2131 try (ResultScanner results = table.getScanner(new Scan())) { 2132 for (Result res : results) { 2133 digest.update(res.getRow()); 2134 } 2135 } 2136 return digest.toString(); 2137 } 2138 2139 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2140 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2141 static { 2142 int i = 0; 2143 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2144 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2145 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2146 ROWS[i][0] = b1; 2147 ROWS[i][1] = b2; 2148 ROWS[i][2] = b3; 2149 i++; 2150 } 2151 } 2152 } 2153 } 2154 2155 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2156 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2157 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2158 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2159 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2160 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2161 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2162 2163 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2164 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2165 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2166 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2167 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2168 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2169 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2170 2171 /** 2172 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2173 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2174 * @return list of region info for regions added to meta 2175 */ 2176 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2177 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2178 try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 2179 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2180 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2181 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2182 TableState.State.ENABLED); 2183 // add custom ones 2184 for (int i = 0; i < startKeys.length; i++) { 2185 int j = (i + 1) % startKeys.length; 2186 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2187 .setEndKey(startKeys[j]).build(); 2188 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2189 newRegions.add(hri); 2190 } 2191 return newRegions; 2192 } 2193 } 2194 2195 /** 2196 * Create an unmanaged WAL. Be sure to close it when you're through. 2197 */ 2198 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2199 throws IOException { 2200 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2201 // unless I pass along via the conf. 2202 Configuration confForWAL = new Configuration(conf); 2203 CommonFSUtils.setRootDir(confForWAL, rootDir); 2204 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.insecure().nextNumeric(8)) 2205 .getWAL(hri); 2206 } 2207 2208 /** 2209 * Create a region with it's own WAL. Be sure to call 2210 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2211 */ 2212 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2213 final Configuration conf, final TableDescriptor htd) throws IOException { 2214 return createRegionAndWAL(info, rootDir, conf, htd, true); 2215 } 2216 2217 /** 2218 * Create a region with it's own WAL. Be sure to call 2219 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2220 */ 2221 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2222 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2223 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2224 region.setBlockCache(blockCache); 2225 region.initialize(); 2226 return region; 2227 } 2228 2229 /** 2230 * Create a region with it's own WAL. Be sure to call 2231 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2232 */ 2233 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2234 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2235 throws IOException { 2236 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2237 region.setMobFileCache(mobFileCache); 2238 region.initialize(); 2239 return region; 2240 } 2241 2242 /** 2243 * Create a region with it's own WAL. Be sure to call 2244 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2245 */ 2246 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2247 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2248 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2249 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2250 WAL wal = createWal(conf, rootDir, info); 2251 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2252 } 2253 2254 /** 2255 * Find any other region server which is different from the one identified by parameter 2256 * @return another region server 2257 */ 2258 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2259 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2260 if (!(rst.getRegionServer() == rs)) { 2261 return rst.getRegionServer(); 2262 } 2263 } 2264 return null; 2265 } 2266 2267 /** 2268 * Tool to get the reference to the region server object that holds the region of the specified 2269 * user table. 2270 * @param tableName user table to lookup in hbase:meta 2271 * @return region server that holds it, null if the row doesn't exist 2272 */ 2273 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2274 throws IOException, InterruptedException { 2275 List<RegionInfo> regions = getAdmin().getRegions(tableName); 2276 if (regions == null || regions.isEmpty()) { 2277 return null; 2278 } 2279 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2280 2281 byte[] firstRegionName = 2282 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2283 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2284 2285 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2286 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2287 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2288 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2289 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2290 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2291 while (retrier.shouldRetry()) { 2292 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2293 if (index != -1) { 2294 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2295 } 2296 // Came back -1. Region may not be online yet. Sleep a while. 2297 retrier.sleepUntilNextRetry(); 2298 } 2299 return null; 2300 } 2301 2302 /** 2303 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2304 * MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start multiple 2305 * MiniMRCluster instances with different log dirs. 2306 * @throws IOException When starting the cluster fails. 2307 */ 2308 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2309 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2310 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2311 "99.0"); 2312 startMiniMapReduceCluster(2); 2313 return mrCluster; 2314 } 2315 2316 /** 2317 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2318 * filesystem. MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start 2319 * multiple MiniMRCluster instances with different log dirs. 2320 * @param servers The number of <code>TaskTracker</code>'s to start. 2321 * @throws IOException When starting the cluster fails. 2322 */ 2323 private void startMiniMapReduceCluster(final int servers) throws IOException { 2324 if (mrCluster != null) { 2325 throw new IllegalStateException("MiniMRCluster is already running"); 2326 } 2327 LOG.info("Starting mini mapreduce cluster..."); 2328 setupClusterTestDir(); 2329 createDirsAndSetProperties(); 2330 2331 //// hadoop2 specific settings 2332 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2333 // we up the VM usable so that processes don't get killed. 2334 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2335 2336 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2337 // this avoids the problem by disabling speculative task execution in tests. 2338 conf.setBoolean("mapreduce.map.speculative", false); 2339 conf.setBoolean("mapreduce.reduce.speculative", false); 2340 //// 2341 2342 // Yarn container runs in independent JVM. We need to pass the argument manually here if the 2343 // JDK version >= 17. Otherwise, the MiniMRCluster will fail. 2344 if (JVM.getJVMSpecVersion() >= 17) { 2345 String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", ""); 2346 conf.set("yarn.app.mapreduce.am.command-opts", 2347 jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED"); 2348 } 2349 2350 // Disable fs cache for DistributedFileSystem, to avoid YARN RM close the shared FileSystem 2351 // instance and cause trouble in HBase. See HBASE-29802 for more details. 2352 JobConf mrClusterConf = new JobConf(conf); 2353 mrClusterConf.setBoolean("fs.hdfs.impl.disable.cache", true); 2354 // Allow the user to override FS URI for this map-reduce cluster to use. 2355 mrCluster = 2356 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2357 1, null, null, mrClusterConf); 2358 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2359 if (jobConf == null) { 2360 jobConf = mrCluster.createJobConf(); 2361 } 2362 2363 // Hadoop MiniMR overwrites this while it should not 2364 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2365 LOG.info("Mini mapreduce cluster started"); 2366 2367 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2368 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2369 // necessary config properties here. YARN-129 required adding a few properties. 2370 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2371 // this for mrv2 support; mr1 ignores this 2372 conf.set("mapreduce.framework.name", "yarn"); 2373 conf.setBoolean("yarn.is.minicluster", true); 2374 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2375 if (rmAddress != null) { 2376 conf.set("yarn.resourcemanager.address", rmAddress); 2377 } 2378 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2379 if (historyAddress != null) { 2380 conf.set("mapreduce.jobhistory.address", historyAddress); 2381 } 2382 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2383 if (schedulerAddress != null) { 2384 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2385 } 2386 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2387 if (mrJobHistoryWebappAddress != null) { 2388 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2389 } 2390 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2391 if (yarnRMWebappAddress != null) { 2392 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2393 } 2394 } 2395 2396 /** 2397 * Stops the previously started <code>MiniMRCluster</code>. 2398 */ 2399 public void shutdownMiniMapReduceCluster() { 2400 if (mrCluster != null) { 2401 LOG.info("Stopping mini mapreduce cluster..."); 2402 mrCluster.shutdown(); 2403 mrCluster = null; 2404 LOG.info("Mini mapreduce cluster stopped"); 2405 } 2406 // Restore configuration to point to local jobtracker 2407 conf.set("mapreduce.jobtracker.address", "local"); 2408 } 2409 2410 /** 2411 * Create a stubbed out RegionServerService, mainly for getting FS. 2412 */ 2413 public RegionServerServices createMockRegionServerService() throws IOException { 2414 return createMockRegionServerService((ServerName) null); 2415 } 2416 2417 /** 2418 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2419 * TestTokenAuthentication 2420 */ 2421 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2422 throws IOException { 2423 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2424 rss.setFileSystem(getTestFileSystem()); 2425 rss.setRpcServer(rpc); 2426 return rss; 2427 } 2428 2429 /** 2430 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2431 * TestOpenRegionHandler 2432 */ 2433 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2434 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2435 rss.setFileSystem(getTestFileSystem()); 2436 return rss; 2437 } 2438 2439 /** 2440 * Expire the Master's session 2441 */ 2442 public void expireMasterSession() throws Exception { 2443 HMaster master = getMiniHBaseCluster().getMaster(); 2444 expireSession(master.getZooKeeper(), false); 2445 } 2446 2447 /** 2448 * Expire a region server's session 2449 * @param index which RS 2450 */ 2451 public void expireRegionServerSession(int index) throws Exception { 2452 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2453 expireSession(rs.getZooKeeper(), false); 2454 decrementMinRegionServerCount(); 2455 } 2456 2457 private void decrementMinRegionServerCount() { 2458 // decrement the count for this.conf, for newly spwaned master 2459 // this.hbaseCluster shares this configuration too 2460 decrementMinRegionServerCount(getConfiguration()); 2461 2462 // each master thread keeps a copy of configuration 2463 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2464 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2465 } 2466 } 2467 2468 private void decrementMinRegionServerCount(Configuration conf) { 2469 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2470 if (currentCount != -1) { 2471 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2472 } 2473 } 2474 2475 public void expireSession(ZKWatcher nodeZK) throws Exception { 2476 expireSession(nodeZK, false); 2477 } 2478 2479 /** 2480 * Expire a ZooKeeper session as recommended in ZooKeeper documentation 2481 * http://hbase.apache.org/book.html#trouble.zookeeper 2482 * <p/> 2483 * There are issues when doing this: 2484 * <ol> 2485 * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li> 2486 * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li> 2487 * </ol> 2488 * @param nodeZK - the ZK watcher to expire 2489 * @param checkStatus - true to check if we can create a Table with the current configuration. 2490 */ 2491 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2492 Configuration c = new Configuration(this.conf); 2493 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2494 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2495 byte[] password = zk.getSessionPasswd(); 2496 long sessionID = zk.getSessionId(); 2497 2498 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2499 // so we create a first watcher to be sure that the 2500 // event was sent. We expect that if our watcher receives the event 2501 // other watchers on the same machine will get is as well. 2502 // When we ask to close the connection, ZK does not close it before 2503 // we receive all the events, so don't have to capture the event, just 2504 // closing the connection should be enough. 2505 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2506 @Override 2507 public void process(WatchedEvent watchedEvent) { 2508 LOG.info("Monitor ZKW received event=" + watchedEvent); 2509 } 2510 }, sessionID, password); 2511 2512 // Making it expire 2513 ZooKeeper newZK = 2514 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2515 2516 // ensure that we have connection to the server before closing down, otherwise 2517 // the close session event will be eaten out before we start CONNECTING state 2518 long start = EnvironmentEdgeManager.currentTime(); 2519 while ( 2520 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2521 ) { 2522 Thread.sleep(1); 2523 } 2524 newZK.close(); 2525 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2526 2527 // Now closing & waiting to be sure that the clients get it. 2528 monitor.close(); 2529 2530 if (checkStatus) { 2531 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2532 } 2533 } 2534 2535 /** 2536 * Get the Mini HBase cluster. 2537 * @return hbase cluster 2538 * @see #getHBaseClusterInterface() 2539 */ 2540 public SingleProcessHBaseCluster getHBaseCluster() { 2541 return getMiniHBaseCluster(); 2542 } 2543 2544 /** 2545 * Returns the HBaseCluster instance. 2546 * <p> 2547 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2548 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2549 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2550 * instead w/o the need to type-cast. 2551 */ 2552 public HBaseClusterInterface getHBaseClusterInterface() { 2553 // implementation note: we should rename this method as #getHBaseCluster(), 2554 // but this would require refactoring 90+ calls. 2555 return hbaseCluster; 2556 } 2557 2558 /** 2559 * Resets the connections so that the next time getConnection() is called, a new connection is 2560 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2561 * the connection is not valid anymore. 2562 * <p/> 2563 * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are 2564 * written, not all start() stop() calls go through this class. Most tests directly operate on the 2565 * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain 2566 * the connection state automatically. Cleaning this is a much bigger refactor. 2567 */ 2568 public void invalidateConnection() throws IOException { 2569 closeConnection(); 2570 // Update the master addresses if they changed. 2571 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2572 final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY); 2573 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2574 masterConfigBefore, masterConfAfter); 2575 conf.set(HConstants.MASTER_ADDRS_KEY, 2576 getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY)); 2577 } 2578 2579 /** 2580 * Get a shared Connection to the cluster. this method is thread safe. 2581 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2582 */ 2583 public Connection getConnection() throws IOException { 2584 return getAsyncConnection().toConnection(); 2585 } 2586 2587 /** 2588 * Get a assigned Connection to the cluster. this method is thread safe. 2589 * @param user assigned user 2590 * @return A Connection with assigned user. 2591 */ 2592 public Connection getConnection(User user) throws IOException { 2593 return getAsyncConnection(user).toConnection(); 2594 } 2595 2596 /** 2597 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2598 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2599 * of cluster. 2600 */ 2601 public AsyncClusterConnection getAsyncConnection() throws IOException { 2602 try { 2603 return asyncConnection.updateAndGet(connection -> { 2604 if (connection == null) { 2605 try { 2606 User user = UserProvider.instantiate(conf).getCurrent(); 2607 connection = getAsyncConnection(user); 2608 } catch (IOException ioe) { 2609 throw new UncheckedIOException("Failed to create connection", ioe); 2610 } 2611 } 2612 return connection; 2613 }); 2614 } catch (UncheckedIOException exception) { 2615 throw exception.getCause(); 2616 } 2617 } 2618 2619 /** 2620 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2621 * @param user assigned user 2622 * @return An AsyncClusterConnection with assigned user. 2623 */ 2624 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2625 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2626 } 2627 2628 public void closeConnection() throws IOException { 2629 if (hbaseAdmin != null) { 2630 Closeables.close(hbaseAdmin, true); 2631 hbaseAdmin = null; 2632 } 2633 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2634 if (asyncConnection != null) { 2635 Closeables.close(asyncConnection, true); 2636 } 2637 } 2638 2639 /** 2640 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2641 * it has no effect, it will be closed automatically when the cluster shutdowns 2642 */ 2643 public Admin getAdmin() throws IOException { 2644 if (hbaseAdmin == null) { 2645 this.hbaseAdmin = getConnection().getAdmin(); 2646 } 2647 return hbaseAdmin; 2648 } 2649 2650 private Admin hbaseAdmin = null; 2651 2652 /** 2653 * Returns an {@link Hbck} instance. Needs be closed when done. 2654 */ 2655 public Hbck getHbck() throws IOException { 2656 return getConnection().getHbck(); 2657 } 2658 2659 /** 2660 * Unassign the named region. 2661 * @param regionName The region to unassign. 2662 */ 2663 public void unassignRegion(String regionName) throws IOException { 2664 unassignRegion(Bytes.toBytes(regionName)); 2665 } 2666 2667 /** 2668 * Unassign the named region. 2669 * @param regionName The region to unassign. 2670 */ 2671 public void unassignRegion(byte[] regionName) throws IOException { 2672 getAdmin().unassign(regionName); 2673 } 2674 2675 /** 2676 * Closes the region containing the given row. 2677 * @param row The row to find the containing region. 2678 * @param table The table to find the region. 2679 */ 2680 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 2681 unassignRegionByRow(Bytes.toBytes(row), table); 2682 } 2683 2684 /** 2685 * Closes the region containing the given row. 2686 * @param row The row to find the containing region. 2687 * @param table The table to find the region. 2688 */ 2689 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 2690 HRegionLocation hrl = table.getRegionLocation(row); 2691 unassignRegion(hrl.getRegion().getRegionName()); 2692 } 2693 2694 /** 2695 * Retrieves a splittable region randomly from tableName 2696 * @param tableName name of table 2697 * @param maxAttempts maximum number of attempts, unlimited for value of -1 2698 * @return the HRegion chosen, null if none was found within limit of maxAttempts 2699 */ 2700 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 2701 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 2702 int regCount = regions.size(); 2703 Set<Integer> attempted = new HashSet<>(); 2704 int idx; 2705 int attempts = 0; 2706 do { 2707 regions = getHBaseCluster().getRegions(tableName); 2708 if (regCount != regions.size()) { 2709 // if there was region movement, clear attempted Set 2710 attempted.clear(); 2711 } 2712 regCount = regions.size(); 2713 // There are chances that before we get the region for the table from an RS the region may 2714 // be going for CLOSE. This may be because online schema change is enabled 2715 if (regCount > 0) { 2716 idx = ThreadLocalRandom.current().nextInt(regCount); 2717 // if we have just tried this region, there is no need to try again 2718 if (attempted.contains(idx)) { 2719 continue; 2720 } 2721 HRegion region = regions.get(idx); 2722 if (region.checkSplit().isPresent()) { 2723 return region; 2724 } 2725 attempted.add(idx); 2726 } 2727 attempts++; 2728 } while (maxAttempts == -1 || attempts < maxAttempts); 2729 return null; 2730 } 2731 2732 public MiniDFSCluster getDFSCluster() { 2733 return dfsCluster; 2734 } 2735 2736 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 2737 setDFSCluster(cluster, true); 2738 } 2739 2740 /** 2741 * Set the MiniDFSCluster 2742 * @param cluster cluster to use 2743 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 2744 * is set. 2745 * @throws IllegalStateException if the passed cluster is up when it is required to be down 2746 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 2747 */ 2748 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 2749 throws IllegalStateException, IOException { 2750 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 2751 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 2752 } 2753 this.dfsCluster = cluster; 2754 this.setFs(); 2755 } 2756 2757 public FileSystem getTestFileSystem() throws IOException { 2758 return HFileSystem.get(conf); 2759 } 2760 2761 /** 2762 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 2763 * (30 seconds). 2764 * @param table Table to wait on. 2765 */ 2766 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 2767 waitTableAvailable(table.getName(), 30000); 2768 } 2769 2770 public void waitTableAvailable(TableName table, long timeoutMillis) 2771 throws InterruptedException, IOException { 2772 waitFor(timeoutMillis, predicateTableAvailable(table)); 2773 } 2774 2775 /** 2776 * Wait until all regions in a table have been assigned 2777 * @param table Table to wait on. 2778 * @param timeoutMillis Timeout. 2779 */ 2780 public void waitTableAvailable(byte[] table, long timeoutMillis) 2781 throws InterruptedException, IOException { 2782 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 2783 } 2784 2785 public String explainTableAvailability(TableName tableName) throws IOException { 2786 StringBuilder msg = 2787 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 2788 if (getHBaseCluster().getMaster().isAlive()) { 2789 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 2790 .getRegionStates().getRegionAssignments(); 2791 final List<Pair<RegionInfo, ServerName>> metaLocations = 2792 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 2793 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 2794 RegionInfo hri = metaLocation.getFirst(); 2795 ServerName sn = metaLocation.getSecond(); 2796 if (!assignments.containsKey(hri)) { 2797 msg.append(", region ").append(hri) 2798 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 2799 } else if (sn == null) { 2800 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 2801 } else if (!sn.equals(assignments.get(hri))) { 2802 msg.append(", region ").append(hri) 2803 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 2804 .append(" <> ").append(assignments.get(hri)); 2805 } 2806 } 2807 } 2808 return msg.toString(); 2809 } 2810 2811 public String explainTableState(final TableName table, TableState.State state) 2812 throws IOException { 2813 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 2814 if (tableState == null) { 2815 return "TableState in META: No table state in META for table " + table 2816 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 2817 } else if (!tableState.inStates(state)) { 2818 return "TableState in META: Not " + state + " state, but " + tableState; 2819 } else { 2820 return "TableState in META: OK"; 2821 } 2822 } 2823 2824 @Nullable 2825 public TableState findLastTableState(final TableName table) throws IOException { 2826 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 2827 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 2828 @Override 2829 public boolean visit(Result r) throws IOException { 2830 if (!Arrays.equals(r.getRow(), table.getName())) { 2831 return false; 2832 } 2833 TableState state = CatalogFamilyFormat.getTableState(r); 2834 if (state != null) { 2835 lastTableState.set(state); 2836 } 2837 return true; 2838 } 2839 }; 2840 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 2841 Integer.MAX_VALUE, visitor); 2842 return lastTableState.get(); 2843 } 2844 2845 /** 2846 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2847 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 2848 * table. 2849 * @param table the table to wait on. 2850 * @throws InterruptedException if interrupted while waiting 2851 * @throws IOException if an IO problem is encountered 2852 */ 2853 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 2854 waitTableEnabled(table, 30000); 2855 } 2856 2857 /** 2858 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2859 * have been all assigned. 2860 * @see #waitTableEnabled(TableName, long) 2861 * @param table Table to wait on. 2862 * @param timeoutMillis Time to wait on it being marked enabled. 2863 */ 2864 public void waitTableEnabled(byte[] table, long timeoutMillis) 2865 throws InterruptedException, IOException { 2866 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 2867 } 2868 2869 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 2870 waitFor(timeoutMillis, predicateTableEnabled(table)); 2871 } 2872 2873 /** 2874 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 2875 * after default period (30 seconds) 2876 * @param table Table to wait on. 2877 */ 2878 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 2879 waitTableDisabled(table, 30000); 2880 } 2881 2882 public void waitTableDisabled(TableName table, long millisTimeout) 2883 throws InterruptedException, IOException { 2884 waitFor(millisTimeout, predicateTableDisabled(table)); 2885 } 2886 2887 /** 2888 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 2889 * @param table Table to wait on. 2890 * @param timeoutMillis Time to wait on it being marked disabled. 2891 */ 2892 public void waitTableDisabled(byte[] table, long timeoutMillis) 2893 throws InterruptedException, IOException { 2894 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 2895 } 2896 2897 /** 2898 * Make sure that at least the specified number of region servers are running 2899 * @param num minimum number of region servers that should be running 2900 * @return true if we started some servers 2901 */ 2902 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 2903 boolean startedServer = false; 2904 SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster(); 2905 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 2906 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 2907 startedServer = true; 2908 } 2909 2910 return startedServer; 2911 } 2912 2913 /** 2914 * Make sure that at least the specified number of region servers are running. We don't count the 2915 * ones that are currently stopping or are stopped. 2916 * @param num minimum number of region servers that should be running 2917 * @return true if we started some servers 2918 */ 2919 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 2920 boolean startedServer = ensureSomeRegionServersAvailable(num); 2921 2922 int nonStoppedServers = 0; 2923 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2924 2925 HRegionServer hrs = rst.getRegionServer(); 2926 if (hrs.isStopping() || hrs.isStopped()) { 2927 LOG.info("A region server is stopped or stopping:" + hrs); 2928 } else { 2929 nonStoppedServers++; 2930 } 2931 } 2932 for (int i = nonStoppedServers; i < num; ++i) { 2933 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 2934 startedServer = true; 2935 } 2936 return startedServer; 2937 } 2938 2939 /** 2940 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 2941 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 2942 * @param c Initial configuration 2943 * @param differentiatingSuffix Suffix to differentiate this user from others. 2944 * @return A new configuration instance with a different user set into it. 2945 */ 2946 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 2947 throws IOException { 2948 FileSystem currentfs = FileSystem.get(c); 2949 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 2950 return User.getCurrent(); 2951 } 2952 // Else distributed filesystem. Make a new instance per daemon. Below 2953 // code is taken from the AppendTestUtil over in hdfs. 2954 String username = User.getCurrent().getName() + differentiatingSuffix; 2955 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 2956 return user; 2957 } 2958 2959 public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster) 2960 throws IOException { 2961 NavigableSet<String> online = new TreeSet<>(); 2962 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 2963 try { 2964 for (RegionInfo region : ProtobufUtil 2965 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 2966 online.add(region.getRegionNameAsString()); 2967 } 2968 } catch (RegionServerStoppedException e) { 2969 // That's fine. 2970 } 2971 } 2972 return online; 2973 } 2974 2975 /** 2976 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 2977 * linger. Here is the exception you'll see: 2978 * 2979 * <pre> 2980 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 2981 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 2982 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 2983 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 2984 * </pre> 2985 * 2986 * @param stream A DFSClient.DFSOutputStream. 2987 */ 2988 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 2989 try { 2990 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 2991 for (Class<?> clazz : clazzes) { 2992 String className = clazz.getSimpleName(); 2993 if (className.equals("DFSOutputStream")) { 2994 if (clazz.isInstance(stream)) { 2995 Field maxRecoveryErrorCountField = 2996 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 2997 maxRecoveryErrorCountField.setAccessible(true); 2998 maxRecoveryErrorCountField.setInt(stream, max); 2999 break; 3000 } 3001 } 3002 } 3003 } catch (Exception e) { 3004 LOG.info("Could not set max recovery field", e); 3005 } 3006 } 3007 3008 /** 3009 * Uses directly the assignment manager to assign the region. and waits until the specified region 3010 * has completed assignment. 3011 * @return true if the region is assigned false otherwise. 3012 */ 3013 public boolean assignRegion(final RegionInfo regionInfo) 3014 throws IOException, InterruptedException { 3015 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 3016 am.assign(regionInfo); 3017 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 3018 } 3019 3020 /** 3021 * Move region to destination server and wait till region is completely moved and online 3022 * @param destRegion region to move 3023 * @param destServer destination server of the region 3024 */ 3025 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3026 throws InterruptedException, IOException { 3027 HMaster master = getMiniHBaseCluster().getMaster(); 3028 // TODO: Here we start the move. The move can take a while. 3029 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3030 while (true) { 3031 ServerName serverName = 3032 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3033 if (serverName != null && serverName.equals(destServer)) { 3034 assertRegionOnServer(destRegion, serverName, 2000); 3035 break; 3036 } 3037 Thread.sleep(10); 3038 } 3039 } 3040 3041 /** 3042 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3043 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3044 * master has been informed and updated hbase:meta with the regions deployed server. 3045 * @param tableName the table name 3046 */ 3047 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3048 waitUntilAllRegionsAssigned(tableName, 3049 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3050 } 3051 3052 /** 3053 * Waith until all system table's regions get assigned 3054 */ 3055 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3056 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3057 } 3058 3059 /** 3060 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3061 * timeout. This means all regions have been deployed, master has been informed and updated 3062 * hbase:meta with the regions deployed server. 3063 * @param tableName the table name 3064 * @param timeout timeout, in milliseconds 3065 */ 3066 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3067 throws IOException { 3068 if (!TableName.isMetaTableName(tableName)) { 3069 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3070 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3071 + timeout + "ms"); 3072 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3073 @Override 3074 public String explainFailure() throws IOException { 3075 return explainTableAvailability(tableName); 3076 } 3077 3078 @Override 3079 public boolean evaluate() throws IOException { 3080 Scan scan = new Scan(); 3081 scan.addFamily(HConstants.CATALOG_FAMILY); 3082 boolean tableFound = false; 3083 try (ResultScanner s = meta.getScanner(scan)) { 3084 for (Result r; (r = s.next()) != null;) { 3085 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3086 RegionInfo info = RegionInfo.parseFromOrNull(b); 3087 if (info != null && info.getTable().equals(tableName)) { 3088 // Get server hosting this region from catalog family. Return false if no server 3089 // hosting this region, or if the server hosting this region was recently killed 3090 // (for fault tolerance testing). 3091 tableFound = true; 3092 byte[] server = 3093 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3094 if (server == null) { 3095 return false; 3096 } else { 3097 byte[] startCode = 3098 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3099 ServerName serverName = 3100 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3101 + Bytes.toLong(startCode)); 3102 if ( 3103 !getHBaseClusterInterface().isDistributedCluster() 3104 && getHBaseCluster().isKilledRS(serverName) 3105 ) { 3106 return false; 3107 } 3108 } 3109 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3110 return false; 3111 } 3112 } 3113 } 3114 } 3115 if (!tableFound) { 3116 LOG.warn( 3117 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3118 } 3119 return tableFound; 3120 } 3121 }); 3122 } 3123 } 3124 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3125 // check from the master state if we are using a mini cluster 3126 if (!getHBaseClusterInterface().isDistributedCluster()) { 3127 // So, all regions are in the meta table but make sure master knows of the assignments before 3128 // returning -- sometimes this can lag. 3129 HMaster master = getHBaseCluster().getMaster(); 3130 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3131 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3132 @Override 3133 public String explainFailure() throws IOException { 3134 return explainTableAvailability(tableName); 3135 } 3136 3137 @Override 3138 public boolean evaluate() throws IOException { 3139 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3140 return hris != null && !hris.isEmpty(); 3141 } 3142 }); 3143 } 3144 LOG.info("All regions for table " + tableName + " assigned."); 3145 } 3146 3147 /** 3148 * Do a small get/scan against one store. This is required because store has no actual methods of 3149 * querying itself, and relies on StoreScanner. 3150 */ 3151 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3152 Scan scan = new Scan(get); 3153 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3154 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3155 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3156 // readpoint 0. 3157 0); 3158 3159 List<Cell> result = new ArrayList<>(); 3160 scanner.next(result); 3161 if (!result.isEmpty()) { 3162 // verify that we are on the row we want: 3163 Cell kv = result.get(0); 3164 if (!CellUtil.matchingRows(kv, get.getRow())) { 3165 result.clear(); 3166 } 3167 } 3168 scanner.close(); 3169 return result; 3170 } 3171 3172 /** 3173 * Create region split keys between startkey and endKey 3174 * @param numRegions the number of regions to be created. it has to be greater than 3. 3175 * @return resulting split keys 3176 */ 3177 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3178 assertTrue(numRegions > 3); 3179 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3180 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3181 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3182 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3183 return result; 3184 } 3185 3186 /** 3187 * Do a small get/scan against one store. This is required because store has no actual methods of 3188 * querying itself, and relies on StoreScanner. 3189 */ 3190 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3191 throws IOException { 3192 Get get = new Get(row); 3193 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3194 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3195 3196 return getFromStoreFile(store, get); 3197 } 3198 3199 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3200 final List<? extends Cell> actual) { 3201 final int eLen = expected.size(); 3202 final int aLen = actual.size(); 3203 final int minLen = Math.min(eLen, aLen); 3204 3205 int i = 0; 3206 while ( 3207 i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0 3208 ) { 3209 i++; 3210 } 3211 3212 if (additionalMsg == null) { 3213 additionalMsg = ""; 3214 } 3215 if (!additionalMsg.isEmpty()) { 3216 additionalMsg = ". " + additionalMsg; 3217 } 3218 3219 if (eLen != aLen || i != minLen) { 3220 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3221 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3222 + " (length " + aLen + ")" + additionalMsg); 3223 } 3224 } 3225 3226 public static <T> String safeGetAsStr(List<T> lst, int i) { 3227 if (0 <= i && i < lst.size()) { 3228 return lst.get(i).toString(); 3229 } else { 3230 return "<out_of_range>"; 3231 } 3232 } 3233 3234 public String getRpcConnnectionURI() throws UnknownHostException { 3235 return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); 3236 } 3237 3238 public String getZkConnectionURI() { 3239 return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 3240 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3241 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3242 } 3243 3244 /** 3245 * Get the zk based cluster key for this cluster. 3246 * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the 3247 * connection info of a cluster. Keep here only for compatibility. 3248 * @see #getRpcConnnectionURI() 3249 * @see #getZkConnectionURI() 3250 */ 3251 @Deprecated 3252 public String getClusterKey() { 3253 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3254 + ":" 3255 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3256 } 3257 3258 /** 3259 * Creates a random table with the given parameters 3260 */ 3261 public Table createRandomTable(TableName tableName, final Collection<String> families, 3262 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3263 final int numRowsPerFlush) throws IOException, InterruptedException { 3264 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3265 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3266 + maxVersions + "\n"); 3267 3268 final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L); 3269 final int numCF = families.size(); 3270 final byte[][] cfBytes = new byte[numCF][]; 3271 { 3272 int cfIndex = 0; 3273 for (String cf : families) { 3274 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3275 } 3276 } 3277 3278 final int actualStartKey = 0; 3279 final int actualEndKey = Integer.MAX_VALUE; 3280 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3281 final int splitStartKey = actualStartKey + keysPerRegion; 3282 final int splitEndKey = actualEndKey - keysPerRegion; 3283 final String keyFormat = "%08x"; 3284 final Table table = createTable(tableName, cfBytes, maxVersions, 3285 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3286 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3287 3288 if (hbaseCluster != null) { 3289 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3290 } 3291 3292 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3293 3294 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3295 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3296 final byte[] row = Bytes.toBytes( 3297 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3298 3299 Put put = new Put(row); 3300 Delete del = new Delete(row); 3301 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3302 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3303 final long ts = rand.nextInt(); 3304 final byte[] qual = Bytes.toBytes("col" + iCol); 3305 if (rand.nextBoolean()) { 3306 final byte[] value = 3307 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3308 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3309 put.addColumn(cf, qual, ts, value); 3310 } else if (rand.nextDouble() < 0.8) { 3311 del.addColumn(cf, qual, ts); 3312 } else { 3313 del.addColumns(cf, qual, ts); 3314 } 3315 } 3316 3317 if (!put.isEmpty()) { 3318 mutator.mutate(put); 3319 } 3320 3321 if (!del.isEmpty()) { 3322 mutator.mutate(del); 3323 } 3324 } 3325 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3326 mutator.flush(); 3327 if (hbaseCluster != null) { 3328 getMiniHBaseCluster().flushcache(table.getName()); 3329 } 3330 } 3331 mutator.close(); 3332 3333 return table; 3334 } 3335 3336 public static int randomFreePort() { 3337 return HBaseCommonTestingUtil.randomFreePort(); 3338 } 3339 3340 public static String randomMultiCastAddress() { 3341 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3342 } 3343 3344 public static void waitForHostPort(String host, int port) throws IOException { 3345 final int maxTimeMs = 10000; 3346 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3347 IOException savedException = null; 3348 LOG.info("Waiting for server at " + host + ":" + port); 3349 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3350 try { 3351 Socket sock = new Socket(InetAddress.getByName(host), port); 3352 sock.close(); 3353 savedException = null; 3354 LOG.info("Server at " + host + ":" + port + " is available"); 3355 break; 3356 } catch (UnknownHostException e) { 3357 throw new IOException("Failed to look up " + host, e); 3358 } catch (IOException e) { 3359 savedException = e; 3360 } 3361 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3362 } 3363 3364 if (savedException != null) { 3365 throw savedException; 3366 } 3367 } 3368 3369 public static int getMetaRSPort(Connection connection) throws IOException { 3370 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3371 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3372 } 3373 } 3374 3375 /** 3376 * Due to async racing issue, a region may not be in the online region list of a region server 3377 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3378 */ 3379 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3380 final long timeout) throws IOException, InterruptedException { 3381 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3382 while (true) { 3383 List<RegionInfo> regions = getAdmin().getRegions(server); 3384 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3385 long now = EnvironmentEdgeManager.currentTime(); 3386 if (now > timeoutTime) break; 3387 Thread.sleep(10); 3388 } 3389 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3390 } 3391 3392 /** 3393 * Check to make sure the region is open on the specified region server, but not on any other one. 3394 */ 3395 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3396 final long timeout) throws IOException, InterruptedException { 3397 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3398 while (true) { 3399 List<RegionInfo> regions = getAdmin().getRegions(server); 3400 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3401 List<JVMClusterUtil.RegionServerThread> rsThreads = 3402 getHBaseCluster().getLiveRegionServerThreads(); 3403 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3404 HRegionServer rs = rsThread.getRegionServer(); 3405 if (server.equals(rs.getServerName())) { 3406 continue; 3407 } 3408 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3409 for (HRegion r : hrs) { 3410 assertTrue("Region should not be double assigned", 3411 r.getRegionInfo().getRegionId() != hri.getRegionId()); 3412 } 3413 } 3414 return; // good, we are happy 3415 } 3416 long now = EnvironmentEdgeManager.currentTime(); 3417 if (now > timeoutTime) break; 3418 Thread.sleep(10); 3419 } 3420 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3421 } 3422 3423 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3424 TableDescriptor td = 3425 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3426 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3427 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3428 } 3429 3430 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3431 BlockCache blockCache) throws IOException { 3432 TableDescriptor td = 3433 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3434 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3435 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3436 } 3437 3438 public static void setFileSystemURI(String fsURI) { 3439 FS_URI = fsURI; 3440 } 3441 3442 /** 3443 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3444 */ 3445 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3446 return new ExplainingPredicate<IOException>() { 3447 @Override 3448 public String explainFailure() throws IOException { 3449 final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager(); 3450 return "found in transition: " + am.getRegionsInTransition().toString(); 3451 } 3452 3453 @Override 3454 public boolean evaluate() throws IOException { 3455 HMaster master = getMiniHBaseCluster().getMaster(); 3456 if (master == null) return false; 3457 AssignmentManager am = master.getAssignmentManager(); 3458 if (am == null) return false; 3459 return !am.hasRegionsInTransition(); 3460 } 3461 }; 3462 } 3463 3464 /** 3465 * Returns a {@link Predicate} for checking that there are no procedure to region transition in 3466 * master 3467 */ 3468 public ExplainingPredicate<IOException> predicateNoRegionTransitScheduled() { 3469 return new ExplainingPredicate<IOException>() { 3470 @Override 3471 public String explainFailure() throws IOException { 3472 final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager(); 3473 return "Number of procedure scheduled for region transit: " 3474 + am.getRegionTransitScheduledCount(); 3475 } 3476 3477 @Override 3478 public boolean evaluate() throws IOException { 3479 HMaster master = getMiniHBaseCluster().getMaster(); 3480 if (master == null) { 3481 return false; 3482 } 3483 AssignmentManager am = master.getAssignmentManager(); 3484 if (am == null) { 3485 return false; 3486 } 3487 return am.getRegionTransitScheduledCount() == 0; 3488 } 3489 }; 3490 } 3491 3492 /** 3493 * Returns a {@link Predicate} for checking that table is enabled 3494 */ 3495 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3496 return new ExplainingPredicate<IOException>() { 3497 @Override 3498 public String explainFailure() throws IOException { 3499 return explainTableState(tableName, TableState.State.ENABLED); 3500 } 3501 3502 @Override 3503 public boolean evaluate() throws IOException { 3504 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3505 } 3506 }; 3507 } 3508 3509 /** 3510 * Returns a {@link Predicate} for checking that table is enabled 3511 */ 3512 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3513 return new ExplainingPredicate<IOException>() { 3514 @Override 3515 public String explainFailure() throws IOException { 3516 return explainTableState(tableName, TableState.State.DISABLED); 3517 } 3518 3519 @Override 3520 public boolean evaluate() throws IOException { 3521 return getAdmin().isTableDisabled(tableName); 3522 } 3523 }; 3524 } 3525 3526 /** 3527 * Returns a {@link Predicate} for checking that table is enabled 3528 */ 3529 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3530 return new ExplainingPredicate<IOException>() { 3531 @Override 3532 public String explainFailure() throws IOException { 3533 return explainTableAvailability(tableName); 3534 } 3535 3536 @Override 3537 public boolean evaluate() throws IOException { 3538 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3539 if (tableAvailable) { 3540 try (Table table = getConnection().getTable(tableName)) { 3541 TableDescriptor htd = table.getDescriptor(); 3542 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3543 .getAllRegionLocations()) { 3544 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3545 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3546 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3547 for (byte[] family : htd.getColumnFamilyNames()) { 3548 scan.addFamily(family); 3549 } 3550 try (ResultScanner scanner = table.getScanner(scan)) { 3551 scanner.next(); 3552 } 3553 } 3554 } 3555 } 3556 return tableAvailable; 3557 } 3558 }; 3559 } 3560 3561 /** 3562 * Wait until no regions in transition. 3563 * @param timeout How long to wait. 3564 */ 3565 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3566 waitFor(timeout, predicateNoRegionsInTransition()); 3567 } 3568 3569 /** 3570 * Wait until no regions in transition. 3571 * @param timeout How long to wait. 3572 */ 3573 public void waitUntilNoRegionTransitScheduled(final long timeout) throws IOException { 3574 waitFor(timeout, predicateNoRegionTransitScheduled()); 3575 } 3576 3577 /** 3578 * Wait until no regions in transition. (time limit 15min) 3579 */ 3580 public void waitUntilNoRegionsInTransition() throws IOException { 3581 waitUntilNoRegionsInTransition(15 * 60000); 3582 } 3583 3584 /** 3585 * Wait until no TRSP is present 3586 */ 3587 public void waitUntilNoRegionTransitScheduled() throws IOException { 3588 waitUntilNoRegionTransitScheduled(15 * 60000); 3589 } 3590 3591 /** 3592 * Wait until labels is ready in VisibilityLabelsCache. 3593 */ 3594 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 3595 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 3596 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 3597 3598 @Override 3599 public boolean evaluate() { 3600 for (String label : labels) { 3601 if (labelsCache.getLabelOrdinal(label) == 0) { 3602 return false; 3603 } 3604 } 3605 return true; 3606 } 3607 3608 @Override 3609 public String explainFailure() { 3610 for (String label : labels) { 3611 if (labelsCache.getLabelOrdinal(label) == 0) { 3612 return label + " is not available yet"; 3613 } 3614 } 3615 return ""; 3616 } 3617 }); 3618 } 3619 3620 /** 3621 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3622 * available. 3623 * @return the list of column descriptors 3624 */ 3625 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 3626 return generateColumnDescriptors(""); 3627 } 3628 3629 /** 3630 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3631 * available. 3632 * @param prefix family names prefix 3633 * @return the list of column descriptors 3634 */ 3635 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 3636 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 3637 long familyId = 0; 3638 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 3639 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 3640 for (BloomType bloomType : BloomType.values()) { 3641 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 3642 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 3643 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 3644 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 3645 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 3646 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 3647 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 3648 familyId++; 3649 } 3650 } 3651 } 3652 return columnFamilyDescriptors; 3653 } 3654 3655 /** 3656 * Get supported compression algorithms. 3657 * @return supported compression algorithms. 3658 */ 3659 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 3660 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 3661 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 3662 for (String algoName : allAlgos) { 3663 try { 3664 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 3665 algo.getCompressor(); 3666 supportedAlgos.add(algo); 3667 } catch (Throwable t) { 3668 // this algo is not available 3669 } 3670 } 3671 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 3672 } 3673 3674 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 3675 Scan scan = new Scan().withStartRow(row); 3676 scan.setReadType(ReadType.PREAD); 3677 scan.setCaching(1); 3678 scan.setReversed(true); 3679 scan.addFamily(family); 3680 try (RegionScanner scanner = r.getScanner(scan)) { 3681 List<Cell> cells = new ArrayList<>(1); 3682 scanner.next(cells); 3683 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 3684 return null; 3685 } 3686 return Result.create(cells); 3687 } 3688 } 3689 3690 private boolean isTargetTable(final byte[] inRow, Cell c) { 3691 String inputRowString = Bytes.toString(inRow); 3692 int i = inputRowString.indexOf(HConstants.DELIMITER); 3693 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 3694 int o = outputRowString.indexOf(HConstants.DELIMITER); 3695 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 3696 } 3697 3698 /** 3699 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 3700 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 3701 * kerby KDC server and utility for using it, 3702 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 3703 * less baggage. It came in in HBASE-5291. 3704 */ 3705 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 3706 Properties conf = MiniKdc.createConf(); 3707 conf.put(MiniKdc.DEBUG, true); 3708 MiniKdc kdc = null; 3709 File dir = null; 3710 // There is time lag between selecting a port and trying to bind with it. It's possible that 3711 // another service captures the port in between which'll result in BindException. 3712 boolean bindException; 3713 int numTries = 0; 3714 do { 3715 try { 3716 bindException = false; 3717 dir = new File(getDataTestDir("kdc").toUri().getPath()); 3718 kdc = new MiniKdc(conf, dir); 3719 kdc.start(); 3720 } catch (BindException e) { 3721 FileUtils.deleteDirectory(dir); // clean directory 3722 numTries++; 3723 if (numTries == 3) { 3724 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 3725 throw e; 3726 } 3727 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 3728 bindException = true; 3729 } 3730 } while (bindException); 3731 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 3732 return kdc; 3733 } 3734 3735 public int getNumHFiles(final TableName tableName, final byte[] family) { 3736 int numHFiles = 0; 3737 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 3738 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 3739 } 3740 return numHFiles; 3741 } 3742 3743 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 3744 final byte[] family) { 3745 int numHFiles = 0; 3746 for (Region region : rs.getRegions(tableName)) { 3747 numHFiles += region.getStore(family).getStorefilesCount(); 3748 } 3749 return numHFiles; 3750 } 3751 3752 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 3753 assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode()); 3754 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 3755 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 3756 assertEquals(ltdFamilies.size(), rtdFamilies.size()); 3757 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 3758 it2 = rtdFamilies.iterator(); it.hasNext();) { 3759 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 3760 } 3761 } 3762 3763 /** 3764 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 3765 * invocations. 3766 */ 3767 public static void await(final long sleepMillis, final BooleanSupplier condition) 3768 throws InterruptedException { 3769 try { 3770 while (!condition.getAsBoolean()) { 3771 Thread.sleep(sleepMillis); 3772 } 3773 } catch (RuntimeException e) { 3774 if (e.getCause() instanceof AssertionError) { 3775 throw (AssertionError) e.getCause(); 3776 } 3777 throw e; 3778 } 3779 } 3780 3781 public void createRegionDir(RegionInfo hri) throws IOException { 3782 Path rootDir = getDataTestDir(); 3783 Path tableDir = CommonFSUtils.getTableDir(rootDir, hri.getTable()); 3784 Path regionDir = new Path(tableDir, hri.getEncodedName()); 3785 FileSystem fs = getTestFileSystem(); 3786 if (!fs.exists(regionDir)) { 3787 fs.mkdirs(regionDir); 3788 } 3789 } 3790 3791 public void createRegionDir(RegionInfo regionInfo, MasterFileSystem masterFileSystem) 3792 throws IOException { 3793 Path tableDir = 3794 CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), regionInfo.getTable()); 3795 HRegionFileSystem.createRegionOnFileSystem(conf, masterFileSystem.getFileSystem(), tableDir, 3796 regionInfo); 3797 } 3798}