001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.Closeable; 026import java.io.File; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.io.UncheckedIOException; 030import java.lang.reflect.Field; 031import java.net.BindException; 032import java.net.DatagramSocket; 033import java.net.InetAddress; 034import java.net.ServerSocket; 035import java.net.Socket; 036import java.net.UnknownHostException; 037import java.nio.charset.StandardCharsets; 038import java.security.MessageDigest; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.HashSet; 044import java.util.Iterator; 045import java.util.List; 046import java.util.Map; 047import java.util.NavigableSet; 048import java.util.Properties; 049import java.util.Random; 050import java.util.Set; 051import java.util.TreeSet; 052import java.util.concurrent.ExecutionException; 053import java.util.concurrent.ThreadLocalRandom; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicReference; 056import java.util.function.BooleanSupplier; 057import org.apache.commons.io.FileUtils; 058import org.apache.commons.lang3.RandomStringUtils; 059import org.apache.hadoop.conf.Configuration; 060import org.apache.hadoop.fs.FileSystem; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 063import org.apache.hadoop.hbase.Waiter.Predicate; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.AsyncAdmin; 066import org.apache.hadoop.hbase.client.AsyncClusterConnection; 067import org.apache.hadoop.hbase.client.BufferedMutator; 068import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.Connection; 072import org.apache.hadoop.hbase.client.ConnectionFactory; 073import org.apache.hadoop.hbase.client.Consistency; 074import org.apache.hadoop.hbase.client.Delete; 075import org.apache.hadoop.hbase.client.Durability; 076import org.apache.hadoop.hbase.client.Get; 077import org.apache.hadoop.hbase.client.Hbck; 078import org.apache.hadoop.hbase.client.MasterRegistry; 079import org.apache.hadoop.hbase.client.Put; 080import org.apache.hadoop.hbase.client.RegionInfo; 081import org.apache.hadoop.hbase.client.RegionInfoBuilder; 082import org.apache.hadoop.hbase.client.RegionLocator; 083import org.apache.hadoop.hbase.client.Result; 084import org.apache.hadoop.hbase.client.ResultScanner; 085import org.apache.hadoop.hbase.client.Scan; 086import org.apache.hadoop.hbase.client.Scan.ReadType; 087import org.apache.hadoop.hbase.client.Table; 088import org.apache.hadoop.hbase.client.TableDescriptor; 089import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 090import org.apache.hadoop.hbase.client.TableState; 091import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 092import org.apache.hadoop.hbase.fs.HFileSystem; 093import org.apache.hadoop.hbase.io.compress.Compression; 094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 096import org.apache.hadoop.hbase.io.hfile.BlockCache; 097import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 098import org.apache.hadoop.hbase.io.hfile.HFile; 099import org.apache.hadoop.hbase.ipc.RpcServerInterface; 100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 101import org.apache.hadoop.hbase.master.HMaster; 102import org.apache.hadoop.hbase.master.MasterFileSystem; 103import org.apache.hadoop.hbase.master.RegionState; 104import org.apache.hadoop.hbase.master.ServerManager; 105import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 106import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 107import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 108import org.apache.hadoop.hbase.master.assignment.RegionStates; 109import org.apache.hadoop.hbase.mob.MobFileCache; 110import org.apache.hadoop.hbase.regionserver.BloomType; 111import org.apache.hadoop.hbase.regionserver.ChunkCreator; 112import org.apache.hadoop.hbase.regionserver.HRegion; 113import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 114import org.apache.hadoop.hbase.regionserver.HRegionServer; 115import org.apache.hadoop.hbase.regionserver.HStore; 116import org.apache.hadoop.hbase.regionserver.InternalScanner; 117import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 118import org.apache.hadoop.hbase.regionserver.Region; 119import org.apache.hadoop.hbase.regionserver.RegionScanner; 120import org.apache.hadoop.hbase.regionserver.RegionServerServices; 121import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 122import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 123import org.apache.hadoop.hbase.security.User; 124import org.apache.hadoop.hbase.security.UserProvider; 125import org.apache.hadoop.hbase.security.access.AccessController; 126import org.apache.hadoop.hbase.security.access.PermissionStorage; 127import org.apache.hadoop.hbase.security.access.SecureTestUtil; 128import org.apache.hadoop.hbase.security.token.TokenProvider; 129import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 130import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 131import org.apache.hadoop.hbase.util.Bytes; 132import org.apache.hadoop.hbase.util.CommonFSUtils; 133import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 134import org.apache.hadoop.hbase.util.FSUtils; 135import org.apache.hadoop.hbase.util.JVM; 136import org.apache.hadoop.hbase.util.JVMClusterUtil; 137import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 138import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 139import org.apache.hadoop.hbase.util.Pair; 140import org.apache.hadoop.hbase.util.RetryCounter; 141import org.apache.hadoop.hbase.util.Threads; 142import org.apache.hadoop.hbase.wal.WAL; 143import org.apache.hadoop.hbase.wal.WALFactory; 144import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 145import org.apache.hadoop.hbase.zookeeper.ZKConfig; 146import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 147import org.apache.hadoop.hdfs.DFSClient; 148import org.apache.hadoop.hdfs.DistributedFileSystem; 149import org.apache.hadoop.hdfs.MiniDFSCluster; 150import org.apache.hadoop.hdfs.server.datanode.DataNode; 151import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 152import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 153import org.apache.hadoop.mapred.JobConf; 154import org.apache.hadoop.mapred.MiniMRCluster; 155import org.apache.hadoop.minikdc.MiniKdc; 156import org.apache.yetus.audience.InterfaceAudience; 157import org.apache.yetus.audience.InterfaceStability; 158import org.apache.zookeeper.WatchedEvent; 159import org.apache.zookeeper.ZooKeeper; 160import org.apache.zookeeper.ZooKeeper.States; 161 162import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 163 164import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 165 166/** 167 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 168 * functionality. Create an instance and keep it around testing HBase. 169 * <p/> 170 * This class is meant to be your one-stop shop for anything you might need testing. Manages one 171 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster}, 172 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real 173 * cluster. 174 * <p/> 175 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration. 176 * <p/> 177 * It does not set logging levels. 178 * <p/> 179 * In the configuration properties, default values for master-info-port and region-server-port are 180 * overridden such that a random port will be assigned (thus avoiding port contention if another 181 * local HBase instance is already running). 182 * <p/> 183 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 184 * setting it to true. 185 */ 186@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 187@InterfaceStability.Evolving 188public class HBaseTestingUtil extends HBaseZKTestingUtil { 189 190 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 191 192 private MiniDFSCluster dfsCluster = null; 193 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 194 195 private volatile HBaseClusterInterface hbaseCluster = null; 196 private MiniMRCluster mrCluster = null; 197 198 /** If there is a mini cluster running for this testing utility instance. */ 199 private volatile boolean miniClusterRunning; 200 201 private String hadoopLogDir; 202 203 /** 204 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 205 */ 206 private Path dataTestDirOnTestFS = null; 207 208 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 209 210 /** Filesystem URI used for map-reduce mini-cluster setup */ 211 private static String FS_URI; 212 213 /** This is for unit tests parameterized with a single boolean. */ 214 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 215 216 /** 217 * Checks to see if a specific port is available. 218 * @param port the port number to check for availability 219 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 220 */ 221 public static boolean available(int port) { 222 ServerSocket ss = null; 223 DatagramSocket ds = null; 224 try { 225 ss = new ServerSocket(port); 226 ss.setReuseAddress(true); 227 ds = new DatagramSocket(port); 228 ds.setReuseAddress(true); 229 return true; 230 } catch (IOException e) { 231 // Do nothing 232 } finally { 233 if (ds != null) { 234 ds.close(); 235 } 236 237 if (ss != null) { 238 try { 239 ss.close(); 240 } catch (IOException e) { 241 /* should not be thrown */ 242 } 243 } 244 } 245 246 return false; 247 } 248 249 /** 250 * Create all combinations of Bloom filters and compression algorithms for testing. 251 */ 252 private static List<Object[]> bloomAndCompressionCombinations() { 253 List<Object[]> configurations = new ArrayList<>(); 254 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 255 for (BloomType bloomType : BloomType.values()) { 256 configurations.add(new Object[] { comprAlgo, bloomType }); 257 } 258 } 259 return Collections.unmodifiableList(configurations); 260 } 261 262 /** 263 * Create combination of memstoreTS and tags 264 */ 265 private static List<Object[]> memStoreTSAndTagsCombination() { 266 List<Object[]> configurations = new ArrayList<>(); 267 configurations.add(new Object[] { false, false }); 268 configurations.add(new Object[] { false, true }); 269 configurations.add(new Object[] { true, false }); 270 configurations.add(new Object[] { true, true }); 271 return Collections.unmodifiableList(configurations); 272 } 273 274 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 275 List<Object[]> configurations = new ArrayList<>(); 276 configurations.add(new Object[] { false, false, true }); 277 configurations.add(new Object[] { false, false, false }); 278 configurations.add(new Object[] { false, true, true }); 279 configurations.add(new Object[] { false, true, false }); 280 configurations.add(new Object[] { true, false, true }); 281 configurations.add(new Object[] { true, false, false }); 282 configurations.add(new Object[] { true, true, true }); 283 configurations.add(new Object[] { true, true, false }); 284 return Collections.unmodifiableList(configurations); 285 } 286 287 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 288 bloomAndCompressionCombinations(); 289 290 /** 291 * <p> 292 * Create an HBaseTestingUtility using a default configuration. 293 * <p> 294 * Initially, all tmp files are written to a local test data directory. Once 295 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 296 * data will be written to the DFS directory instead. 297 */ 298 public HBaseTestingUtil() { 299 this(HBaseConfiguration.create()); 300 } 301 302 /** 303 * <p> 304 * Create an HBaseTestingUtility using a given configuration. 305 * <p> 306 * Initially, all tmp files are written to a local test data directory. Once 307 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 308 * data will be written to the DFS directory instead. 309 * @param conf The configuration to use for further operations 310 */ 311 public HBaseTestingUtil(@Nullable Configuration conf) { 312 super(conf); 313 314 // a hbase checksum verification failure will cause unit tests to fail 315 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 316 317 // Save this for when setting default file:// breaks things 318 if (this.conf.get("fs.defaultFS") != null) { 319 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 320 } 321 if (this.conf.get(HConstants.HBASE_DIR) != null) { 322 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 323 } 324 // Every cluster is a local cluster until we start DFS 325 // Note that conf could be null, but this.conf will not be 326 String dataTestDir = getDataTestDir().toString(); 327 this.conf.set("fs.defaultFS", "file:///"); 328 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 329 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 330 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 331 // If the value for random ports isn't set set it to true, thus making 332 // tests opt-out for random port assignment 333 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 334 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 335 } 336 337 /** 338 * Close both the region {@code r} and it's underlying WAL. For use in tests. 339 */ 340 public static void closeRegionAndWAL(final Region r) throws IOException { 341 closeRegionAndWAL((HRegion) r); 342 } 343 344 /** 345 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 346 */ 347 public static void closeRegionAndWAL(final HRegion r) throws IOException { 348 if (r == null) return; 349 r.close(); 350 if (r.getWAL() == null) return; 351 r.getWAL().close(); 352 } 353 354 /** 355 * Start mini secure cluster with given kdc and principals. 356 * @param kdc Mini kdc server 357 * @param servicePrincipal Service principal without realm. 358 * @param spnegoPrincipal Spnego principal without realm. 359 * @return Handler to shutdown the cluster 360 */ 361 public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal, 362 String spnegoPrincipal) throws Exception { 363 Configuration conf = getConfiguration(); 364 365 SecureTestUtil.enableSecurity(conf); 366 VisibilityTestUtil.enableVisiblityLabels(conf); 367 SecureTestUtil.verifyConfiguration(conf); 368 369 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 370 AccessController.class.getName() + ',' + TokenProvider.class.getName()); 371 372 HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(), 373 spnegoPrincipal + '@' + kdc.getRealm()); 374 375 startMiniCluster(); 376 try { 377 waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); 378 } catch (Exception e) { 379 shutdownMiniCluster(); 380 throw e; 381 } 382 383 return this::shutdownMiniCluster; 384 } 385 386 /** 387 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 388 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 389 * by the Configuration. If say, a Connection was being used against a cluster that had been 390 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 391 * Rather than use the return direct, its usually best to make a copy and use that. Do 392 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 393 * @return Instance of Configuration. 394 */ 395 @Override 396 public Configuration getConfiguration() { 397 return super.getConfiguration(); 398 } 399 400 public void setHBaseCluster(HBaseClusterInterface hbaseCluster) { 401 this.hbaseCluster = hbaseCluster; 402 } 403 404 /** 405 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 406 * have many concurrent tests running if we need to. Moding a System property is not the way to do 407 * concurrent instances -- another instance could grab the temporary value unintentionally -- but 408 * not anything can do about it at moment; single instance only is how the minidfscluster works. 409 * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir 410 * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir 411 * (We do not create them!). 412 * @return The calculated data test build directory, if newly-created. 413 */ 414 @Override 415 protected Path setupDataTestDir() { 416 Path testPath = super.setupDataTestDir(); 417 if (null == testPath) { 418 return null; 419 } 420 421 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 422 423 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 424 // we want our own value to ensure uniqueness on the same machine 425 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 426 427 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 428 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 429 return testPath; 430 } 431 432 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 433 434 String sysValue = System.getProperty(propertyName); 435 436 if (sysValue != null) { 437 // There is already a value set. So we do nothing but hope 438 // that there will be no conflicts 439 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 440 + " so I do NOT create it in " + parent); 441 String confValue = conf.get(propertyName); 442 if (confValue != null && !confValue.endsWith(sysValue)) { 443 LOG.warn(propertyName + " property value differs in configuration and system: " 444 + "Configuration=" + confValue + " while System=" + sysValue 445 + " Erasing configuration value by system value."); 446 } 447 conf.set(propertyName, sysValue); 448 } else { 449 // Ok, it's not set, so we create it as a subdirectory 450 createSubDir(propertyName, parent, subDirName); 451 System.setProperty(propertyName, conf.get(propertyName)); 452 } 453 } 454 455 /** 456 * @return Where to write test data on the test filesystem; Returns working directory for the test 457 * filesystem by default 458 * @see #setupDataTestDirOnTestFS() 459 * @see #getTestFileSystem() 460 */ 461 private Path getBaseTestDirOnTestFS() throws IOException { 462 FileSystem fs = getTestFileSystem(); 463 return new Path(fs.getWorkingDirectory(), "test-data"); 464 } 465 466 /** 467 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 468 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 469 * on it. 470 * @return a unique path in the test filesystem 471 */ 472 public Path getDataTestDirOnTestFS() throws IOException { 473 if (dataTestDirOnTestFS == null) { 474 setupDataTestDirOnTestFS(); 475 } 476 477 return dataTestDirOnTestFS; 478 } 479 480 /** 481 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 482 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 483 * on it. 484 * @return a unique path in the test filesystem 485 * @param subdirName name of the subdir to create under the base test dir 486 */ 487 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 488 return new Path(getDataTestDirOnTestFS(), subdirName); 489 } 490 491 /** 492 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 493 * setup. 494 */ 495 private void setupDataTestDirOnTestFS() throws IOException { 496 if (dataTestDirOnTestFS != null) { 497 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 498 return; 499 } 500 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 501 } 502 503 /** 504 * Sets up a new path in test filesystem to be used by tests. 505 */ 506 private Path getNewDataTestDirOnTestFS() throws IOException { 507 // The file system can be either local, mini dfs, or if the configuration 508 // is supplied externally, it can be an external cluster FS. If it is a local 509 // file system, the tests should use getBaseTestDir, otherwise, we can use 510 // the working directory, and create a unique sub dir there 511 FileSystem fs = getTestFileSystem(); 512 Path newDataTestDir; 513 String randomStr = getRandomUUID().toString(); 514 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 515 newDataTestDir = new Path(getDataTestDir(), randomStr); 516 File dataTestDir = new File(newDataTestDir.toString()); 517 if (deleteOnExit()) dataTestDir.deleteOnExit(); 518 } else { 519 Path base = getBaseTestDirOnTestFS(); 520 newDataTestDir = new Path(base, randomStr); 521 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 522 } 523 return newDataTestDir; 524 } 525 526 /** 527 * Cleans the test data directory on the test filesystem. 528 * @return True if we removed the test dirs 529 */ 530 public boolean cleanupDataTestDirOnTestFS() throws IOException { 531 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 532 if (ret) { 533 dataTestDirOnTestFS = null; 534 } 535 return ret; 536 } 537 538 /** 539 * Cleans a subdirectory under the test data directory on the test filesystem. 540 * @return True if we removed child 541 */ 542 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 543 Path cpath = getDataTestDirOnTestFS(subdirName); 544 return getTestFileSystem().delete(cpath, true); 545 } 546 547 /** 548 * Start a minidfscluster. 549 * @param servers How many DNs to start. 550 * @see #shutdownMiniDFSCluster() 551 * @return The mini dfs cluster created. 552 */ 553 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 554 return startMiniDFSCluster(servers, null); 555 } 556 557 /** 558 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 559 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 560 * instances of the datanodes will have the same host name. 561 * @param hosts hostnames DNs to run on. 562 * @see #shutdownMiniDFSCluster() 563 * @return The mini dfs cluster created. 564 */ 565 public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception { 566 if (hosts != null && hosts.length != 0) { 567 return startMiniDFSCluster(hosts.length, hosts); 568 } else { 569 return startMiniDFSCluster(1, null); 570 } 571 } 572 573 /** 574 * Start a minidfscluster. Can only create one. 575 * @param servers How many DNs to start. 576 * @param hosts hostnames DNs to run on. 577 * @see #shutdownMiniDFSCluster() 578 * @return The mini dfs cluster created. 579 */ 580 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception { 581 return startMiniDFSCluster(servers, null, hosts); 582 } 583 584 private void setFs() throws IOException { 585 if (this.dfsCluster == null) { 586 LOG.info("Skipping setting fs because dfsCluster is null"); 587 return; 588 } 589 FileSystem fs = this.dfsCluster.getFileSystem(); 590 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 591 592 // re-enable this check with dfs 593 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 594 } 595 596 // Workaround to avoid IllegalThreadStateException 597 // See HBASE-27148 for more details 598 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 599 600 private volatile boolean stopped = false; 601 602 private final MiniDFSCluster cluster; 603 604 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 605 super("FsDatasetAsyncDiskServiceFixer"); 606 setDaemon(true); 607 this.cluster = cluster; 608 } 609 610 @Override 611 public void run() { 612 while (!stopped) { 613 try { 614 Thread.sleep(30000); 615 } catch (InterruptedException e) { 616 Thread.currentThread().interrupt(); 617 continue; 618 } 619 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 620 // timeout of the thread pool executor is 60 seconds by default. 621 try { 622 for (DataNode dn : cluster.getDataNodes()) { 623 FsDatasetSpi<?> dataset = dn.getFSDataset(); 624 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 625 service.setAccessible(true); 626 Object asyncDiskService = service.get(dataset); 627 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 628 group.setAccessible(true); 629 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 630 if (threadGroup.isDaemon()) { 631 threadGroup.setDaemon(false); 632 } 633 } 634 } catch (NoSuchFieldException e) { 635 LOG.debug("NoSuchFieldException: " + e.getMessage() 636 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 637 + "See HBASE-27595 for details."); 638 } catch (Exception e) { 639 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 640 } 641 } 642 } 643 644 void shutdown() { 645 stopped = true; 646 interrupt(); 647 } 648 } 649 650 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts) 651 throws Exception { 652 createDirsAndSetProperties(); 653 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 654 655 this.dfsCluster = 656 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 657 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 658 this.dfsClusterFixer.start(); 659 // Set this just-started cluster as our filesystem. 660 setFs(); 661 662 // Wait for the cluster to be totally up 663 this.dfsCluster.waitClusterUp(); 664 665 // reset the test directory for test file system 666 dataTestDirOnTestFS = null; 667 String dataTestDir = getDataTestDir().toString(); 668 conf.set(HConstants.HBASE_DIR, dataTestDir); 669 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 670 671 return this.dfsCluster; 672 } 673 674 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 675 createDirsAndSetProperties(); 676 dfsCluster = 677 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 678 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 679 this.dfsClusterFixer.start(); 680 return dfsCluster; 681 } 682 683 /** 684 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 685 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 686 * the conf. 687 * 688 * <pre> 689 * Configuration conf = TEST_UTIL.getConfiguration(); 690 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 691 * Map.Entry<String, String> e = i.next(); 692 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 693 * } 694 * </pre> 695 */ 696 private void createDirsAndSetProperties() throws IOException { 697 setupClusterTestDir(); 698 conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath()); 699 createDirAndSetProperty("test.cache.data"); 700 createDirAndSetProperty("hadoop.tmp.dir"); 701 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 702 createDirAndSetProperty("mapreduce.cluster.local.dir"); 703 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 704 enableShortCircuit(); 705 706 Path root = getDataTestDirOnTestFS("hadoop"); 707 conf.set(MapreduceTestingShim.getMROutputDirProp(), 708 new Path(root, "mapred-output-dir").toString()); 709 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 710 conf.set("mapreduce.jobtracker.staging.root.dir", 711 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 712 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 713 conf.set("yarn.app.mapreduce.am.staging-dir", 714 new Path(root, "mapreduce-am-staging-root-dir").toString()); 715 716 // Frustrate yarn's and hdfs's attempts at writing /tmp. 717 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 718 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 719 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 720 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 721 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 722 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 723 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 724 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 725 createDirAndSetProperty("dfs.journalnode.edits.dir"); 726 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 727 createDirAndSetProperty("nfs.dump.dir"); 728 createDirAndSetProperty("java.io.tmpdir"); 729 createDirAndSetProperty("dfs.journalnode.edits.dir"); 730 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 731 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 732 } 733 734 /** 735 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 736 * Default to false. 737 */ 738 public boolean isNewVersionBehaviorEnabled() { 739 final String propName = "hbase.tests.new.version.behavior"; 740 String v = System.getProperty(propName); 741 if (v != null) { 742 return Boolean.parseBoolean(v); 743 } 744 return false; 745 } 746 747 /** 748 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 749 * allows to specify this parameter on the command line. If not set, default is true. 750 */ 751 public boolean isReadShortCircuitOn() { 752 final String propName = "hbase.tests.use.shortcircuit.reads"; 753 String readOnProp = System.getProperty(propName); 754 if (readOnProp != null) { 755 return Boolean.parseBoolean(readOnProp); 756 } else { 757 return conf.getBoolean(propName, false); 758 } 759 } 760 761 /** 762 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 763 * including skipping the hdfs checksum checks. 764 */ 765 private void enableShortCircuit() { 766 if (isReadShortCircuitOn()) { 767 String curUser = System.getProperty("user.name"); 768 LOG.info("read short circuit is ON for user " + curUser); 769 // read short circuit, for hdfs 770 conf.set("dfs.block.local-path-access.user", curUser); 771 // read short circuit, for hbase 772 conf.setBoolean("dfs.client.read.shortcircuit", true); 773 // Skip checking checksum, for the hdfs client and the datanode 774 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 775 } else { 776 LOG.info("read short circuit is OFF"); 777 } 778 } 779 780 private String createDirAndSetProperty(final String property) { 781 return createDirAndSetProperty(property, property); 782 } 783 784 private String createDirAndSetProperty(final String relPath, String property) { 785 String path = getDataTestDir(relPath).toString(); 786 System.setProperty(property, path); 787 conf.set(property, path); 788 new File(path).mkdirs(); 789 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 790 return path; 791 } 792 793 /** 794 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 795 */ 796 public void shutdownMiniDFSCluster() throws IOException { 797 if (this.dfsCluster != null) { 798 // The below throws an exception per dn, AsynchronousCloseException. 799 this.dfsCluster.shutdown(); 800 dfsCluster = null; 801 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 802 // have a fixer 803 if (dfsClusterFixer != null) { 804 this.dfsClusterFixer.shutdown(); 805 dfsClusterFixer = null; 806 } 807 dataTestDirOnTestFS = null; 808 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 809 } 810 } 811 812 /** 813 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 814 * other options will use default values, defined in {@link StartTestingClusterOption.Builder}. 815 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 816 * @see #startMiniCluster(StartTestingClusterOption option) 817 * @see #shutdownMiniDFSCluster() 818 */ 819 public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception { 820 StartTestingClusterOption option = StartTestingClusterOption.builder() 821 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 822 return startMiniCluster(option); 823 } 824 825 /** 826 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 827 * value can be found in {@link StartTestingClusterOption.Builder}. 828 * @see #startMiniCluster(StartTestingClusterOption option) 829 * @see #shutdownMiniDFSCluster() 830 */ 831 public SingleProcessHBaseCluster startMiniCluster() throws Exception { 832 return startMiniCluster(StartTestingClusterOption.builder().build()); 833 } 834 835 /** 836 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 837 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 838 * under System property test.build.data, to be cleaned up on exit. 839 * @see #shutdownMiniDFSCluster() 840 */ 841 public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option) 842 throws Exception { 843 LOG.info("Starting up minicluster with option: {}", option); 844 845 // If we already put up a cluster, fail. 846 if (miniClusterRunning) { 847 throw new IllegalStateException("A mini-cluster is already running"); 848 } 849 miniClusterRunning = true; 850 851 setupClusterTestDir(); 852 853 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 854 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 855 if (dfsCluster == null) { 856 LOG.info("STARTING DFS"); 857 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 858 } else { 859 LOG.info("NOT STARTING DFS"); 860 } 861 862 // Start up a zk cluster. 863 if (getZkCluster() == null) { 864 startMiniZKCluster(option.getNumZkServers()); 865 } 866 867 // Start the MiniHBaseCluster 868 return startMiniHBaseCluster(option); 869 } 870 871 /** 872 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 873 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 874 * @return Reference to the hbase mini hbase cluster. 875 * @see #startMiniCluster(StartTestingClusterOption) 876 * @see #shutdownMiniHBaseCluster() 877 */ 878 public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option) 879 throws IOException, InterruptedException { 880 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 881 createRootDir(option.isCreateRootDir()); 882 if (option.isCreateWALDir()) { 883 createWALRootDir(); 884 } 885 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 886 // for tests that do not read hbase-defaults.xml 887 setHBaseFsTmpDir(); 888 889 // These settings will make the server waits until this exact number of 890 // regions servers are connected. 891 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 892 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 893 } 894 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 895 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 896 } 897 898 Configuration c = new Configuration(this.conf); 899 this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(), 900 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 901 option.getMasterClass(), option.getRsClass()); 902 // Populate the master address configuration from mini cluster configuration. 903 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 904 // Don't leave here till we've done a successful scan of the hbase:meta 905 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 906 ResultScanner s = t.getScanner(new Scan())) { 907 for (;;) { 908 if (s.next() == null) { 909 break; 910 } 911 } 912 } 913 914 getAdmin(); // create immediately the hbaseAdmin 915 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 916 917 return (SingleProcessHBaseCluster) hbaseCluster; 918 } 919 920 /** 921 * Starts up mini hbase cluster using default options. Default options can be found in 922 * {@link StartTestingClusterOption.Builder}. 923 * @see #startMiniHBaseCluster(StartTestingClusterOption) 924 * @see #shutdownMiniHBaseCluster() 925 */ 926 public SingleProcessHBaseCluster startMiniHBaseCluster() 927 throws IOException, InterruptedException { 928 return startMiniHBaseCluster(StartTestingClusterOption.builder().build()); 929 } 930 931 /** 932 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 933 * {@link #startMiniCluster()}. All other options will use default values, defined in 934 * {@link StartTestingClusterOption.Builder}. 935 * @param numMasters Master node number. 936 * @param numRegionServers Number of region servers. 937 * @return The mini HBase cluster created. 938 * @see #shutdownMiniHBaseCluster() 939 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 940 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 941 * @see #startMiniHBaseCluster(StartTestingClusterOption) 942 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 943 */ 944 @Deprecated 945 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 946 throws IOException, InterruptedException { 947 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 948 .numRegionServers(numRegionServers).build(); 949 return startMiniHBaseCluster(option); 950 } 951 952 /** 953 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 954 * {@link #startMiniCluster()}. All other options will use default values, defined in 955 * {@link StartTestingClusterOption.Builder}. 956 * @param numMasters Master node number. 957 * @param numRegionServers Number of region servers. 958 * @param rsPorts Ports that RegionServer should use. 959 * @return The mini HBase cluster created. 960 * @see #shutdownMiniHBaseCluster() 961 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 962 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 963 * @see #startMiniHBaseCluster(StartTestingClusterOption) 964 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 965 */ 966 @Deprecated 967 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 968 List<Integer> rsPorts) throws IOException, InterruptedException { 969 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 970 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 971 return startMiniHBaseCluster(option); 972 } 973 974 /** 975 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 976 * {@link #startMiniCluster()}. All other options will use default values, defined in 977 * {@link StartTestingClusterOption.Builder}. 978 * @param numMasters Master node number. 979 * @param numRegionServers Number of region servers. 980 * @param rsPorts Ports that RegionServer should use. 981 * @param masterClass The class to use as HMaster, or null for default. 982 * @param rsClass The class to use as HRegionServer, or null for default. 983 * @param createRootDir Whether to create a new root or data directory path. 984 * @param createWALDir Whether to create a new WAL directory. 985 * @return The mini HBase cluster created. 986 * @see #shutdownMiniHBaseCluster() 987 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 988 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 989 * @see #startMiniHBaseCluster(StartTestingClusterOption) 990 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 991 */ 992 @Deprecated 993 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 994 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 995 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass, 996 boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException { 997 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 998 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 999 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 1000 return startMiniHBaseCluster(option); 1001 } 1002 1003 /** 1004 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 1005 * want to keep dfs/zk up and just stop/start hbase. 1006 * @param servers number of region servers 1007 */ 1008 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 1009 this.restartHBaseCluster(servers, null); 1010 } 1011 1012 public void restartHBaseCluster(int servers, List<Integer> ports) 1013 throws IOException, InterruptedException { 1014 StartTestingClusterOption option = 1015 StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 1016 restartHBaseCluster(option); 1017 invalidateConnection(); 1018 } 1019 1020 public void restartHBaseCluster(StartTestingClusterOption option) 1021 throws IOException, InterruptedException { 1022 closeConnection(); 1023 this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(), 1024 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1025 option.getMasterClass(), option.getRsClass()); 1026 // Don't leave here till we've done a successful scan of the hbase:meta 1027 Connection conn = ConnectionFactory.createConnection(this.conf); 1028 Table t = conn.getTable(TableName.META_TABLE_NAME); 1029 ResultScanner s = t.getScanner(new Scan()); 1030 while (s.next() != null) { 1031 // do nothing 1032 } 1033 LOG.info("HBase has been restarted"); 1034 s.close(); 1035 t.close(); 1036 conn.close(); 1037 } 1038 1039 /** 1040 * Returns current mini hbase cluster. Only has something in it after a call to 1041 * {@link #startMiniCluster()}. 1042 * @see #startMiniCluster() 1043 */ 1044 public SingleProcessHBaseCluster getMiniHBaseCluster() { 1045 if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) { 1046 return (SingleProcessHBaseCluster) this.hbaseCluster; 1047 } 1048 throw new RuntimeException( 1049 hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName()); 1050 } 1051 1052 /** 1053 * Stops mini hbase, zk, and hdfs clusters. 1054 * @see #startMiniCluster(int) 1055 */ 1056 public void shutdownMiniCluster() throws IOException { 1057 LOG.info("Shutting down minicluster"); 1058 shutdownMiniHBaseCluster(); 1059 shutdownMiniDFSCluster(); 1060 shutdownMiniZKCluster(); 1061 1062 cleanupTestDir(); 1063 miniClusterRunning = false; 1064 LOG.info("Minicluster is down"); 1065 } 1066 1067 /** 1068 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1069 * @throws java.io.IOException in case command is unsuccessful 1070 */ 1071 public void shutdownMiniHBaseCluster() throws IOException { 1072 cleanup(); 1073 if (this.hbaseCluster != null) { 1074 this.hbaseCluster.shutdown(); 1075 // Wait till hbase is down before going on to shutdown zk. 1076 this.hbaseCluster.waitUntilShutDown(); 1077 this.hbaseCluster = null; 1078 } 1079 if (zooKeeperWatcher != null) { 1080 zooKeeperWatcher.close(); 1081 zooKeeperWatcher = null; 1082 } 1083 } 1084 1085 /** 1086 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1087 * @throws java.io.IOException throws in case command is unsuccessful 1088 */ 1089 public void killMiniHBaseCluster() throws IOException { 1090 cleanup(); 1091 if (this.hbaseCluster != null) { 1092 getMiniHBaseCluster().killAll(); 1093 this.hbaseCluster = null; 1094 } 1095 if (zooKeeperWatcher != null) { 1096 zooKeeperWatcher.close(); 1097 zooKeeperWatcher = null; 1098 } 1099 } 1100 1101 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1102 private void cleanup() throws IOException { 1103 closeConnection(); 1104 // unset the configuration for MIN and MAX RS to start 1105 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1106 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1107 } 1108 1109 /** 1110 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1111 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1112 * If false, previous path is used. Note: this does not cause the root dir to be created. 1113 * @return Fully qualified path for the default hbase root dir 1114 */ 1115 public Path getDefaultRootDirPath(boolean create) throws IOException { 1116 if (!create) { 1117 return getDataTestDirOnTestFS(); 1118 } else { 1119 return getNewDataTestDirOnTestFS(); 1120 } 1121 } 1122 1123 /** 1124 * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that 1125 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1126 * @return Fully qualified path for the default hbase root dir 1127 */ 1128 public Path getDefaultRootDirPath() throws IOException { 1129 return getDefaultRootDirPath(false); 1130 } 1131 1132 /** 1133 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1134 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1135 * startup. You'd only use this method if you were doing manual operation. 1136 * @param create This flag decides whether to get a new root or data directory path or not, if it 1137 * has been fetched already. Note : Directory will be made irrespective of whether 1138 * path has been fetched or not. If directory already exists, it will be overwritten 1139 * @return Fully qualified path to hbase root dir 1140 */ 1141 public Path createRootDir(boolean create) throws IOException { 1142 FileSystem fs = FileSystem.get(this.conf); 1143 Path hbaseRootdir = getDefaultRootDirPath(create); 1144 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1145 fs.mkdirs(hbaseRootdir); 1146 FSUtils.setVersion(fs, hbaseRootdir); 1147 return hbaseRootdir; 1148 } 1149 1150 /** 1151 * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code> 1152 * flag is false. 1153 * @return Fully qualified path to hbase root dir 1154 */ 1155 public Path createRootDir() throws IOException { 1156 return createRootDir(false); 1157 } 1158 1159 /** 1160 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1161 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1162 * this method if you were doing manual operation. 1163 * @return Fully qualified path to hbase root dir 1164 */ 1165 public Path createWALRootDir() throws IOException { 1166 FileSystem fs = FileSystem.get(this.conf); 1167 Path walDir = getNewDataTestDirOnTestFS(); 1168 CommonFSUtils.setWALRootDir(this.conf, walDir); 1169 fs.mkdirs(walDir); 1170 return walDir; 1171 } 1172 1173 private void setHBaseFsTmpDir() throws IOException { 1174 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1175 if (hbaseFsTmpDirInString == null) { 1176 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1177 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1178 } else { 1179 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1180 } 1181 } 1182 1183 /** 1184 * Flushes all caches in the mini hbase cluster 1185 */ 1186 public void flush() throws IOException { 1187 getMiniHBaseCluster().flushcache(); 1188 } 1189 1190 /** 1191 * Flushes all caches in the mini hbase cluster 1192 */ 1193 public void flush(TableName tableName) throws IOException { 1194 getMiniHBaseCluster().flushcache(tableName); 1195 } 1196 1197 /** 1198 * Compact all regions in the mini hbase cluster 1199 */ 1200 public void compact(boolean major) throws IOException { 1201 getMiniHBaseCluster().compact(major); 1202 } 1203 1204 /** 1205 * Compact all of a table's reagion in the mini hbase cluster 1206 */ 1207 public void compact(TableName tableName, boolean major) throws IOException { 1208 getMiniHBaseCluster().compact(tableName, major); 1209 } 1210 1211 /** 1212 * Create a table. 1213 * @return A Table instance for the created table. 1214 */ 1215 public Table createTable(TableName tableName, String family) throws IOException { 1216 return createTable(tableName, new String[] { family }); 1217 } 1218 1219 /** 1220 * Create a table. 1221 * @return A Table instance for the created table. 1222 */ 1223 public Table createTable(TableName tableName, String[] families) throws IOException { 1224 List<byte[]> fams = new ArrayList<>(families.length); 1225 for (String family : families) { 1226 fams.add(Bytes.toBytes(family)); 1227 } 1228 return createTable(tableName, fams.toArray(new byte[0][])); 1229 } 1230 1231 /** 1232 * Create a table. 1233 * @return A Table instance for the created table. 1234 */ 1235 public Table createTable(TableName tableName, byte[] family) throws IOException { 1236 return createTable(tableName, new byte[][] { family }); 1237 } 1238 1239 /** 1240 * Create a table with multiple regions. 1241 * @return A Table instance for the created table. 1242 */ 1243 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1244 throws IOException { 1245 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1246 byte[] startKey = Bytes.toBytes("aaaaa"); 1247 byte[] endKey = Bytes.toBytes("zzzzz"); 1248 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1249 1250 return createTable(tableName, new byte[][] { family }, splitKeys); 1251 } 1252 1253 /** 1254 * Create a table. 1255 * @return A Table instance for the created table. 1256 */ 1257 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1258 return createTable(tableName, families, (byte[][]) null); 1259 } 1260 1261 /** 1262 * Create a table with multiple regions. 1263 * @return A Table instance for the created table. 1264 */ 1265 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1266 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1267 } 1268 1269 /** 1270 * Create a table with multiple regions. 1271 * @param replicaCount replica count. 1272 * @return A Table instance for the created table. 1273 */ 1274 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1275 throws IOException { 1276 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1277 } 1278 1279 /** 1280 * Create a table. 1281 * @return A Table instance for the created table. 1282 */ 1283 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1284 throws IOException { 1285 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1286 } 1287 1288 /** 1289 * Create a table. 1290 * @param tableName the table name 1291 * @param families the families 1292 * @param splitKeys the splitkeys 1293 * @param replicaCount the region replica count 1294 * @return A Table instance for the created table. 1295 * @throws IOException throws IOException 1296 */ 1297 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1298 int replicaCount) throws IOException { 1299 return createTable(tableName, families, splitKeys, replicaCount, 1300 new Configuration(getConfiguration())); 1301 } 1302 1303 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1304 byte[] endKey, int numRegions) throws IOException { 1305 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1306 1307 getAdmin().createTable(desc, startKey, endKey, numRegions); 1308 // HBaseAdmin only waits for regions to appear in hbase:meta we 1309 // should wait until they are assigned 1310 waitUntilAllRegionsAssigned(tableName); 1311 return getConnection().getTable(tableName); 1312 } 1313 1314 /** 1315 * Create a table. 1316 * @param c Configuration to use 1317 * @return A Table instance for the created table. 1318 */ 1319 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1320 throws IOException { 1321 return createTable(htd, families, null, c); 1322 } 1323 1324 /** 1325 * Create a table. 1326 * @param htd table descriptor 1327 * @param families array of column families 1328 * @param splitKeys array of split keys 1329 * @param c Configuration to use 1330 * @return A Table instance for the created table. 1331 * @throws IOException if getAdmin or createTable fails 1332 */ 1333 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1334 Configuration c) throws IOException { 1335 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1336 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1337 // on is interfering. 1338 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1339 } 1340 1341 /** 1342 * Create a table. 1343 * @param htd table descriptor 1344 * @param families array of column families 1345 * @param splitKeys array of split keys 1346 * @param type Bloom type 1347 * @param blockSize block size 1348 * @param c Configuration to use 1349 * @return A Table instance for the created table. 1350 * @throws IOException if getAdmin or createTable fails 1351 */ 1352 1353 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1354 BloomType type, int blockSize, Configuration c) throws IOException { 1355 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1356 for (byte[] family : families) { 1357 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1358 .setBloomFilterType(type).setBlocksize(blockSize); 1359 if (isNewVersionBehaviorEnabled()) { 1360 cfdb.setNewVersionBehavior(true); 1361 } 1362 builder.setColumnFamily(cfdb.build()); 1363 } 1364 TableDescriptor td = builder.build(); 1365 if (splitKeys != null) { 1366 getAdmin().createTable(td, splitKeys); 1367 } else { 1368 getAdmin().createTable(td); 1369 } 1370 // HBaseAdmin only waits for regions to appear in hbase:meta 1371 // we should wait until they are assigned 1372 waitUntilAllRegionsAssigned(td.getTableName()); 1373 return getConnection().getTable(td.getTableName()); 1374 } 1375 1376 /** 1377 * Create a table. 1378 * @param htd table descriptor 1379 * @param splitRows array of split keys 1380 * @return A Table instance for the created table. 1381 */ 1382 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1383 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1384 if (isNewVersionBehaviorEnabled()) { 1385 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1386 builder.setColumnFamily( 1387 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1388 } 1389 } 1390 if (splitRows != null) { 1391 getAdmin().createTable(builder.build(), splitRows); 1392 } else { 1393 getAdmin().createTable(builder.build()); 1394 } 1395 // HBaseAdmin only waits for regions to appear in hbase:meta 1396 // we should wait until they are assigned 1397 waitUntilAllRegionsAssigned(htd.getTableName()); 1398 return getConnection().getTable(htd.getTableName()); 1399 } 1400 1401 /** 1402 * Create a table. 1403 * @param tableName the table name 1404 * @param families the families 1405 * @param splitKeys the split keys 1406 * @param replicaCount the replica count 1407 * @param c Configuration to use 1408 * @return A Table instance for the created table. 1409 */ 1410 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1411 int replicaCount, final Configuration c) throws IOException { 1412 TableDescriptor htd = 1413 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1414 return createTable(htd, families, splitKeys, c); 1415 } 1416 1417 /** 1418 * Create a table. 1419 * @return A Table instance for the created table. 1420 */ 1421 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1422 return createTable(tableName, new byte[][] { family }, numVersions); 1423 } 1424 1425 /** 1426 * Create a table. 1427 * @return A Table instance for the created table. 1428 */ 1429 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1430 throws IOException { 1431 return createTable(tableName, families, numVersions, (byte[][]) null); 1432 } 1433 1434 /** 1435 * Create a table. 1436 * @return A Table instance for the created table. 1437 */ 1438 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1439 byte[][] splitKeys) throws IOException { 1440 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1441 for (byte[] family : families) { 1442 ColumnFamilyDescriptorBuilder cfBuilder = 1443 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1444 if (isNewVersionBehaviorEnabled()) { 1445 cfBuilder.setNewVersionBehavior(true); 1446 } 1447 builder.setColumnFamily(cfBuilder.build()); 1448 } 1449 if (splitKeys != null) { 1450 getAdmin().createTable(builder.build(), splitKeys); 1451 } else { 1452 getAdmin().createTable(builder.build()); 1453 } 1454 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1455 // assigned 1456 waitUntilAllRegionsAssigned(tableName); 1457 return getConnection().getTable(tableName); 1458 } 1459 1460 /** 1461 * Create a table with multiple regions. 1462 * @return A Table instance for the created table. 1463 */ 1464 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1465 throws IOException { 1466 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1467 } 1468 1469 /** 1470 * Create a table. 1471 * @return A Table instance for the created table. 1472 */ 1473 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1474 throws IOException { 1475 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1476 for (byte[] family : families) { 1477 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1478 .setMaxVersions(numVersions).setBlocksize(blockSize); 1479 if (isNewVersionBehaviorEnabled()) { 1480 cfBuilder.setNewVersionBehavior(true); 1481 } 1482 builder.setColumnFamily(cfBuilder.build()); 1483 } 1484 getAdmin().createTable(builder.build()); 1485 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1486 // assigned 1487 waitUntilAllRegionsAssigned(tableName); 1488 return getConnection().getTable(tableName); 1489 } 1490 1491 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1492 String cpName) throws IOException { 1493 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1494 for (byte[] family : families) { 1495 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1496 .setMaxVersions(numVersions).setBlocksize(blockSize); 1497 if (isNewVersionBehaviorEnabled()) { 1498 cfBuilder.setNewVersionBehavior(true); 1499 } 1500 builder.setColumnFamily(cfBuilder.build()); 1501 } 1502 if (cpName != null) { 1503 builder.setCoprocessor(cpName); 1504 } 1505 getAdmin().createTable(builder.build()); 1506 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1507 // assigned 1508 waitUntilAllRegionsAssigned(tableName); 1509 return getConnection().getTable(tableName); 1510 } 1511 1512 /** 1513 * Create a table. 1514 * @return A Table instance for the created table. 1515 */ 1516 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1517 throws IOException { 1518 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1519 int i = 0; 1520 for (byte[] family : families) { 1521 ColumnFamilyDescriptorBuilder cfBuilder = 1522 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1523 if (isNewVersionBehaviorEnabled()) { 1524 cfBuilder.setNewVersionBehavior(true); 1525 } 1526 builder.setColumnFamily(cfBuilder.build()); 1527 i++; 1528 } 1529 getAdmin().createTable(builder.build()); 1530 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1531 // assigned 1532 waitUntilAllRegionsAssigned(tableName); 1533 return getConnection().getTable(tableName); 1534 } 1535 1536 /** 1537 * Create a table. 1538 * @return A Table instance for the created table. 1539 */ 1540 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1541 throws IOException { 1542 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1543 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1544 if (isNewVersionBehaviorEnabled()) { 1545 cfBuilder.setNewVersionBehavior(true); 1546 } 1547 builder.setColumnFamily(cfBuilder.build()); 1548 getAdmin().createTable(builder.build(), splitRows); 1549 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1550 // assigned 1551 waitUntilAllRegionsAssigned(tableName); 1552 return getConnection().getTable(tableName); 1553 } 1554 1555 /** 1556 * Create a table with multiple regions. 1557 * @return A Table instance for the created table. 1558 */ 1559 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1560 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1561 } 1562 1563 /** 1564 * Set the number of Region replicas. 1565 */ 1566 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1567 throws IOException, InterruptedException { 1568 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1569 .setRegionReplication(replicaCount).build(); 1570 admin.modifyTable(desc); 1571 } 1572 1573 /** 1574 * Set the number of Region replicas. 1575 */ 1576 public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount) 1577 throws ExecutionException, IOException, InterruptedException { 1578 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get()) 1579 .setRegionReplication(replicaCount).build(); 1580 admin.modifyTable(desc).get(); 1581 } 1582 1583 /** 1584 * Drop an existing table 1585 * @param tableName existing table 1586 */ 1587 public void deleteTable(TableName tableName) throws IOException { 1588 try { 1589 getAdmin().disableTable(tableName); 1590 } catch (TableNotEnabledException e) { 1591 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1592 } 1593 getAdmin().deleteTable(tableName); 1594 } 1595 1596 /** 1597 * Drop an existing table 1598 * @param tableName existing table 1599 */ 1600 public void deleteTableIfAny(TableName tableName) throws IOException { 1601 try { 1602 deleteTable(tableName); 1603 } catch (TableNotFoundException e) { 1604 // ignore 1605 } 1606 } 1607 1608 // ========================================================================== 1609 // Canned table and table descriptor creation 1610 1611 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1612 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1613 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1614 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1615 private static final int MAXVERSIONS = 3; 1616 1617 public static final char FIRST_CHAR = 'a'; 1618 public static final char LAST_CHAR = 'z'; 1619 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1620 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1621 1622 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1623 return createModifyableTableDescriptor(TableName.valueOf(name), 1624 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1625 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1626 } 1627 1628 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1629 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1630 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1631 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1632 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1633 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1634 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1635 if (isNewVersionBehaviorEnabled()) { 1636 cfBuilder.setNewVersionBehavior(true); 1637 } 1638 builder.setColumnFamily(cfBuilder.build()); 1639 } 1640 return builder.build(); 1641 } 1642 1643 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1644 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1645 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1646 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1647 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1648 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1649 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1650 if (isNewVersionBehaviorEnabled()) { 1651 cfBuilder.setNewVersionBehavior(true); 1652 } 1653 builder.setColumnFamily(cfBuilder.build()); 1654 } 1655 return builder; 1656 } 1657 1658 /** 1659 * Create a table of name <code>name</code>. 1660 * @param name Name to give table. 1661 * @return Column descriptor. 1662 */ 1663 public TableDescriptor createTableDescriptor(final TableName name) { 1664 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1665 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1666 } 1667 1668 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1669 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1670 } 1671 1672 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1673 int maxVersions) { 1674 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1675 for (byte[] family : families) { 1676 ColumnFamilyDescriptorBuilder cfBuilder = 1677 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1678 if (isNewVersionBehaviorEnabled()) { 1679 cfBuilder.setNewVersionBehavior(true); 1680 } 1681 builder.setColumnFamily(cfBuilder.build()); 1682 } 1683 return builder.build(); 1684 } 1685 1686 /** 1687 * Create an HRegion that writes to the local tmp dirs 1688 * @param desc a table descriptor indicating which table the region belongs to 1689 * @param startKey the start boundary of the region 1690 * @param endKey the end boundary of the region 1691 * @return a region that writes to local dir for testing 1692 */ 1693 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1694 throws IOException { 1695 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1696 .setEndKey(endKey).build(); 1697 return createLocalHRegion(hri, desc); 1698 } 1699 1700 /** 1701 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1702 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it. 1703 */ 1704 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1705 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1706 } 1707 1708 /** 1709 * Create an HRegion that writes to the local tmp dirs with specified wal 1710 * @param info regioninfo 1711 * @param conf configuration 1712 * @param desc table descriptor 1713 * @param wal wal for this region. 1714 * @return created hregion 1715 */ 1716 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1717 WAL wal) throws IOException { 1718 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 1719 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1720 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1721 } 1722 1723 /** 1724 * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} 1725 * when done. 1726 */ 1727 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1728 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1729 throws IOException { 1730 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1731 durability, wal, null, families); 1732 } 1733 1734 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1735 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1736 boolean[] compactedMemStore, byte[]... families) throws IOException { 1737 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1738 builder.setReadOnly(isReadOnly); 1739 int i = 0; 1740 for (byte[] family : families) { 1741 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1742 if (compactedMemStore != null && i < compactedMemStore.length) { 1743 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1744 } else { 1745 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1746 1747 } 1748 i++; 1749 // Set default to be three versions. 1750 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1751 builder.setColumnFamily(cfBuilder.build()); 1752 } 1753 builder.setDurability(durability); 1754 RegionInfo info = 1755 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1756 return createLocalHRegion(info, conf, builder.build(), wal); 1757 } 1758 1759 // 1760 // ========================================================================== 1761 1762 /** 1763 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1764 * read. 1765 * @param tableName existing table 1766 * @return HTable to that new table 1767 */ 1768 public Table deleteTableData(TableName tableName) throws IOException { 1769 Table table = getConnection().getTable(tableName); 1770 Scan scan = new Scan(); 1771 ResultScanner resScan = table.getScanner(scan); 1772 for (Result res : resScan) { 1773 Delete del = new Delete(res.getRow()); 1774 table.delete(del); 1775 } 1776 resScan = table.getScanner(scan); 1777 resScan.close(); 1778 return table; 1779 } 1780 1781 /** 1782 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1783 * table. 1784 * @param tableName table which must exist. 1785 * @param preserveRegions keep the existing split points 1786 * @return HTable for the new table 1787 */ 1788 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 1789 throws IOException { 1790 Admin admin = getAdmin(); 1791 if (!admin.isTableDisabled(tableName)) { 1792 admin.disableTable(tableName); 1793 } 1794 admin.truncateTable(tableName, preserveRegions); 1795 return getConnection().getTable(tableName); 1796 } 1797 1798 /** 1799 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1800 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 1801 * preserve regions of existing table. 1802 * @param tableName table which must exist. 1803 * @return HTable for the new table 1804 */ 1805 public Table truncateTable(final TableName tableName) throws IOException { 1806 return truncateTable(tableName, false); 1807 } 1808 1809 /** 1810 * Load table with rows from 'aaa' to 'zzz'. 1811 * @param t Table 1812 * @param f Family 1813 * @return Count of rows loaded. 1814 */ 1815 public int loadTable(final Table t, final byte[] f) throws IOException { 1816 return loadTable(t, new byte[][] { f }); 1817 } 1818 1819 /** 1820 * Load table with rows from 'aaa' to 'zzz'. 1821 * @param t Table 1822 * @param f Family 1823 * @return Count of rows loaded. 1824 */ 1825 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 1826 return loadTable(t, new byte[][] { f }, null, writeToWAL); 1827 } 1828 1829 /** 1830 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1831 * @param t Table 1832 * @param f Array of Families to load 1833 * @return Count of rows loaded. 1834 */ 1835 public int loadTable(final Table t, final byte[][] f) throws IOException { 1836 return loadTable(t, f, null); 1837 } 1838 1839 /** 1840 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1841 * @param t Table 1842 * @param f Array of Families to load 1843 * @param value the values of the cells. If null is passed, the row key is used as value 1844 * @return Count of rows loaded. 1845 */ 1846 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 1847 return loadTable(t, f, value, true); 1848 } 1849 1850 /** 1851 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1852 * @param t Table 1853 * @param f Array of Families to load 1854 * @param value the values of the cells. If null is passed, the row key is used as value 1855 * @return Count of rows loaded. 1856 */ 1857 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 1858 throws IOException { 1859 List<Put> puts = new ArrayList<>(); 1860 for (byte[] row : HBaseTestingUtil.ROWS) { 1861 Put put = new Put(row); 1862 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 1863 for (int i = 0; i < f.length; i++) { 1864 byte[] value1 = value != null ? value : row; 1865 put.addColumn(f[i], f[i], value1); 1866 } 1867 puts.add(put); 1868 } 1869 t.put(puts); 1870 return puts.size(); 1871 } 1872 1873 /** 1874 * A tracker for tracking and validating table rows generated with 1875 * {@link HBaseTestingUtil#loadTable(Table, byte[])} 1876 */ 1877 public static class SeenRowTracker { 1878 int dim = 'z' - 'a' + 1; 1879 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 1880 byte[] startRow; 1881 byte[] stopRow; 1882 1883 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 1884 this.startRow = startRow; 1885 this.stopRow = stopRow; 1886 } 1887 1888 void reset() { 1889 for (byte[] row : ROWS) { 1890 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 1891 } 1892 } 1893 1894 int i(byte b) { 1895 return b - 'a'; 1896 } 1897 1898 public void addRow(byte[] row) { 1899 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 1900 } 1901 1902 /** 1903 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 1904 * rows none 1905 */ 1906 public void validate() { 1907 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1908 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1909 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1910 int count = seenRows[i(b1)][i(b2)][i(b3)]; 1911 int expectedCount = 0; 1912 if ( 1913 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 1914 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 1915 ) { 1916 expectedCount = 1; 1917 } 1918 if (count != expectedCount) { 1919 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 1920 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 1921 + "instead of " + expectedCount); 1922 } 1923 } 1924 } 1925 } 1926 } 1927 } 1928 1929 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 1930 return loadRegion(r, f, false); 1931 } 1932 1933 public int loadRegion(final Region r, final byte[] f) throws IOException { 1934 return loadRegion((HRegion) r, f); 1935 } 1936 1937 /** 1938 * Load region with rows from 'aaa' to 'zzz'. 1939 * @param r Region 1940 * @param f Family 1941 * @param flush flush the cache if true 1942 * @return Count of rows loaded. 1943 */ 1944 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 1945 byte[] k = new byte[3]; 1946 int rowCount = 0; 1947 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1948 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1949 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1950 k[0] = b1; 1951 k[1] = b2; 1952 k[2] = b3; 1953 Put put = new Put(k); 1954 put.setDurability(Durability.SKIP_WAL); 1955 put.addColumn(f, null, k); 1956 if (r.getWAL() == null) { 1957 put.setDurability(Durability.SKIP_WAL); 1958 } 1959 int preRowCount = rowCount; 1960 int pause = 10; 1961 int maxPause = 1000; 1962 while (rowCount == preRowCount) { 1963 try { 1964 r.put(put); 1965 rowCount++; 1966 } catch (RegionTooBusyException e) { 1967 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 1968 Threads.sleep(pause); 1969 } 1970 } 1971 } 1972 } 1973 if (flush) { 1974 r.flush(true); 1975 } 1976 } 1977 return rowCount; 1978 } 1979 1980 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 1981 throws IOException { 1982 for (int i = startRow; i < endRow; i++) { 1983 byte[] data = Bytes.toBytes(String.valueOf(i)); 1984 Put put = new Put(data); 1985 put.addColumn(f, null, data); 1986 t.put(put); 1987 } 1988 } 1989 1990 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 1991 throws IOException { 1992 for (int i = 0; i < totalRows; i++) { 1993 byte[] row = new byte[rowSize]; 1994 Bytes.random(row); 1995 Put put = new Put(row); 1996 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 1997 t.put(put); 1998 } 1999 } 2000 2001 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 2002 int replicaId) throws IOException { 2003 for (int i = startRow; i < endRow; i++) { 2004 String failMsg = "Failed verification of row :" + i; 2005 byte[] data = Bytes.toBytes(String.valueOf(i)); 2006 Get get = new Get(data); 2007 get.setReplicaId(replicaId); 2008 get.setConsistency(Consistency.TIMELINE); 2009 Result result = table.get(get); 2010 assertTrue(failMsg, result.containsColumn(f, null)); 2011 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2012 Cell cell = result.getColumnLatestCell(f, null); 2013 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2014 cell.getValueOffset(), cell.getValueLength())); 2015 } 2016 } 2017 2018 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 2019 throws IOException { 2020 verifyNumericRows((HRegion) region, f, startRow, endRow); 2021 } 2022 2023 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 2024 throws IOException { 2025 verifyNumericRows(region, f, startRow, endRow, true); 2026 } 2027 2028 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 2029 final boolean present) throws IOException { 2030 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 2031 } 2032 2033 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 2034 final boolean present) throws IOException { 2035 for (int i = startRow; i < endRow; i++) { 2036 String failMsg = "Failed verification of row :" + i; 2037 byte[] data = Bytes.toBytes(String.valueOf(i)); 2038 Result result = region.get(new Get(data)); 2039 2040 boolean hasResult = result != null && !result.isEmpty(); 2041 assertEquals(failMsg + result, present, hasResult); 2042 if (!present) continue; 2043 2044 assertTrue(failMsg, result.containsColumn(f, null)); 2045 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2046 Cell cell = result.getColumnLatestCell(f, null); 2047 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2048 cell.getValueOffset(), cell.getValueLength())); 2049 } 2050 } 2051 2052 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2053 throws IOException { 2054 for (int i = startRow; i < endRow; i++) { 2055 byte[] data = Bytes.toBytes(String.valueOf(i)); 2056 Delete delete = new Delete(data); 2057 delete.addFamily(f); 2058 t.delete(delete); 2059 } 2060 } 2061 2062 /** 2063 * Return the number of rows in the given table. 2064 * @param table to count rows 2065 * @return count of rows 2066 */ 2067 public static int countRows(final Table table) throws IOException { 2068 return countRows(table, new Scan()); 2069 } 2070 2071 public static int countRows(final Table table, final Scan scan) throws IOException { 2072 try (ResultScanner results = table.getScanner(scan)) { 2073 int count = 0; 2074 while (results.next() != null) { 2075 count++; 2076 } 2077 return count; 2078 } 2079 } 2080 2081 public static int countRows(final Table table, final byte[]... families) throws IOException { 2082 Scan scan = new Scan(); 2083 for (byte[] family : families) { 2084 scan.addFamily(family); 2085 } 2086 return countRows(table, scan); 2087 } 2088 2089 /** 2090 * Return the number of rows in the given table. 2091 */ 2092 public int countRows(final TableName tableName) throws IOException { 2093 try (Table table = getConnection().getTable(tableName)) { 2094 return countRows(table); 2095 } 2096 } 2097 2098 public static int countRows(final Region region) throws IOException { 2099 return countRows(region, new Scan()); 2100 } 2101 2102 public static int countRows(final Region region, final Scan scan) throws IOException { 2103 try (InternalScanner scanner = region.getScanner(scan)) { 2104 return countRows(scanner); 2105 } 2106 } 2107 2108 public static int countRows(final InternalScanner scanner) throws IOException { 2109 int scannedCount = 0; 2110 List<Cell> results = new ArrayList<>(); 2111 boolean hasMore = true; 2112 while (hasMore) { 2113 hasMore = scanner.next(results); 2114 scannedCount += results.size(); 2115 results.clear(); 2116 } 2117 return scannedCount; 2118 } 2119 2120 /** 2121 * Return an md5 digest of the entire contents of a table. 2122 */ 2123 public String checksumRows(final Table table) throws Exception { 2124 MessageDigest digest = MessageDigest.getInstance("MD5"); 2125 try (ResultScanner results = table.getScanner(new Scan())) { 2126 for (Result res : results) { 2127 digest.update(res.getRow()); 2128 } 2129 } 2130 return digest.toString(); 2131 } 2132 2133 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2134 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2135 static { 2136 int i = 0; 2137 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2138 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2139 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2140 ROWS[i][0] = b1; 2141 ROWS[i][1] = b2; 2142 ROWS[i][2] = b3; 2143 i++; 2144 } 2145 } 2146 } 2147 } 2148 2149 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2150 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2151 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2152 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2153 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2154 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2155 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2156 2157 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2158 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2159 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2160 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2161 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2162 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2163 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2164 2165 /** 2166 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2167 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2168 * @return list of region info for regions added to meta 2169 */ 2170 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2171 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2172 try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 2173 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2174 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2175 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2176 TableState.State.ENABLED); 2177 // add custom ones 2178 for (int i = 0; i < startKeys.length; i++) { 2179 int j = (i + 1) % startKeys.length; 2180 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2181 .setEndKey(startKeys[j]).build(); 2182 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2183 newRegions.add(hri); 2184 } 2185 return newRegions; 2186 } 2187 } 2188 2189 /** 2190 * Create an unmanaged WAL. Be sure to close it when you're through. 2191 */ 2192 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2193 throws IOException { 2194 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2195 // unless I pass along via the conf. 2196 Configuration confForWAL = new Configuration(conf); 2197 CommonFSUtils.setRootDir(confForWAL, rootDir); 2198 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.insecure().nextNumeric(8)) 2199 .getWAL(hri); 2200 } 2201 2202 /** 2203 * Create a region with it's own WAL. Be sure to call 2204 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2205 */ 2206 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2207 final Configuration conf, final TableDescriptor htd) throws IOException { 2208 return createRegionAndWAL(info, rootDir, conf, htd, true); 2209 } 2210 2211 /** 2212 * Create a region with it's own WAL. Be sure to call 2213 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2214 */ 2215 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2216 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2217 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2218 region.setBlockCache(blockCache); 2219 region.initialize(); 2220 return region; 2221 } 2222 2223 /** 2224 * Create a region with it's own WAL. Be sure to call 2225 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2226 */ 2227 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2228 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2229 throws IOException { 2230 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2231 region.setMobFileCache(mobFileCache); 2232 region.initialize(); 2233 return region; 2234 } 2235 2236 /** 2237 * Create a region with it's own WAL. Be sure to call 2238 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2239 */ 2240 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2241 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2242 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2243 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2244 WAL wal = createWal(conf, rootDir, info); 2245 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2246 } 2247 2248 /** 2249 * Find any other region server which is different from the one identified by parameter 2250 * @return another region server 2251 */ 2252 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2253 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2254 if (!(rst.getRegionServer() == rs)) { 2255 return rst.getRegionServer(); 2256 } 2257 } 2258 return null; 2259 } 2260 2261 /** 2262 * Tool to get the reference to the region server object that holds the region of the specified 2263 * user table. 2264 * @param tableName user table to lookup in hbase:meta 2265 * @return region server that holds it, null if the row doesn't exist 2266 */ 2267 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2268 throws IOException, InterruptedException { 2269 List<RegionInfo> regions = getAdmin().getRegions(tableName); 2270 if (regions == null || regions.isEmpty()) { 2271 return null; 2272 } 2273 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2274 2275 byte[] firstRegionName = 2276 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2277 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2278 2279 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2280 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2281 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2282 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2283 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2284 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2285 while (retrier.shouldRetry()) { 2286 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2287 if (index != -1) { 2288 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2289 } 2290 // Came back -1. Region may not be online yet. Sleep a while. 2291 retrier.sleepUntilNextRetry(); 2292 } 2293 return null; 2294 } 2295 2296 /** 2297 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2298 * MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start multiple 2299 * MiniMRCluster instances with different log dirs. 2300 * @throws IOException When starting the cluster fails. 2301 */ 2302 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2303 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2304 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2305 "99.0"); 2306 startMiniMapReduceCluster(2); 2307 return mrCluster; 2308 } 2309 2310 /** 2311 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2312 * filesystem. MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start 2313 * multiple MiniMRCluster instances with different log dirs. 2314 * @param servers The number of <code>TaskTracker</code>'s to start. 2315 * @throws IOException When starting the cluster fails. 2316 */ 2317 private void startMiniMapReduceCluster(final int servers) throws IOException { 2318 if (mrCluster != null) { 2319 throw new IllegalStateException("MiniMRCluster is already running"); 2320 } 2321 LOG.info("Starting mini mapreduce cluster..."); 2322 setupClusterTestDir(); 2323 createDirsAndSetProperties(); 2324 2325 //// hadoop2 specific settings 2326 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2327 // we up the VM usable so that processes don't get killed. 2328 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2329 2330 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2331 // this avoids the problem by disabling speculative task execution in tests. 2332 conf.setBoolean("mapreduce.map.speculative", false); 2333 conf.setBoolean("mapreduce.reduce.speculative", false); 2334 //// 2335 2336 // Yarn container runs in independent JVM. We need to pass the argument manually here if the 2337 // JDK version >= 17. Otherwise, the MiniMRCluster will fail. 2338 if (JVM.getJVMSpecVersion() >= 17) { 2339 String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", ""); 2340 conf.set("yarn.app.mapreduce.am.command-opts", 2341 jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED"); 2342 } 2343 2344 // Disable fs cache for DistributedFileSystem, to avoid YARN RM close the shared FileSystem 2345 // instance and cause trouble in HBase. See HBASE-29802 for more details. 2346 JobConf mrClusterConf = new JobConf(conf); 2347 mrClusterConf.setBoolean("fs.hdfs.impl.disable.cache", true); 2348 // Allow the user to override FS URI for this map-reduce cluster to use. 2349 mrCluster = 2350 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2351 1, null, null, mrClusterConf); 2352 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2353 if (jobConf == null) { 2354 jobConf = mrCluster.createJobConf(); 2355 } 2356 2357 // Hadoop MiniMR overwrites this while it should not 2358 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2359 LOG.info("Mini mapreduce cluster started"); 2360 2361 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2362 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2363 // necessary config properties here. YARN-129 required adding a few properties. 2364 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2365 // this for mrv2 support; mr1 ignores this 2366 conf.set("mapreduce.framework.name", "yarn"); 2367 conf.setBoolean("yarn.is.minicluster", true); 2368 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2369 if (rmAddress != null) { 2370 conf.set("yarn.resourcemanager.address", rmAddress); 2371 } 2372 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2373 if (historyAddress != null) { 2374 conf.set("mapreduce.jobhistory.address", historyAddress); 2375 } 2376 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2377 if (schedulerAddress != null) { 2378 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2379 } 2380 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2381 if (mrJobHistoryWebappAddress != null) { 2382 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2383 } 2384 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2385 if (yarnRMWebappAddress != null) { 2386 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2387 } 2388 } 2389 2390 /** 2391 * Stops the previously started <code>MiniMRCluster</code>. 2392 */ 2393 public void shutdownMiniMapReduceCluster() { 2394 if (mrCluster != null) { 2395 LOG.info("Stopping mini mapreduce cluster..."); 2396 mrCluster.shutdown(); 2397 mrCluster = null; 2398 LOG.info("Mini mapreduce cluster stopped"); 2399 } 2400 // Restore configuration to point to local jobtracker 2401 conf.set("mapreduce.jobtracker.address", "local"); 2402 } 2403 2404 /** 2405 * Create a stubbed out RegionServerService, mainly for getting FS. 2406 */ 2407 public RegionServerServices createMockRegionServerService() throws IOException { 2408 return createMockRegionServerService((ServerName) null); 2409 } 2410 2411 /** 2412 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2413 * TestTokenAuthentication 2414 */ 2415 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2416 throws IOException { 2417 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2418 rss.setFileSystem(getTestFileSystem()); 2419 rss.setRpcServer(rpc); 2420 return rss; 2421 } 2422 2423 /** 2424 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2425 * TestOpenRegionHandler 2426 */ 2427 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2428 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2429 rss.setFileSystem(getTestFileSystem()); 2430 return rss; 2431 } 2432 2433 /** 2434 * Expire the Master's session 2435 */ 2436 public void expireMasterSession() throws Exception { 2437 HMaster master = getMiniHBaseCluster().getMaster(); 2438 expireSession(master.getZooKeeper(), false); 2439 } 2440 2441 /** 2442 * Expire a region server's session 2443 * @param index which RS 2444 */ 2445 public void expireRegionServerSession(int index) throws Exception { 2446 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2447 expireSession(rs.getZooKeeper(), false); 2448 decrementMinRegionServerCount(); 2449 } 2450 2451 private void decrementMinRegionServerCount() { 2452 // decrement the count for this.conf, for newly spwaned master 2453 // this.hbaseCluster shares this configuration too 2454 decrementMinRegionServerCount(getConfiguration()); 2455 2456 // each master thread keeps a copy of configuration 2457 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2458 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2459 } 2460 } 2461 2462 private void decrementMinRegionServerCount(Configuration conf) { 2463 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2464 if (currentCount != -1) { 2465 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2466 } 2467 } 2468 2469 public void expireSession(ZKWatcher nodeZK) throws Exception { 2470 expireSession(nodeZK, false); 2471 } 2472 2473 /** 2474 * Expire a ZooKeeper session as recommended in ZooKeeper documentation 2475 * http://hbase.apache.org/book.html#trouble.zookeeper 2476 * <p/> 2477 * There are issues when doing this: 2478 * <ol> 2479 * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li> 2480 * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li> 2481 * </ol> 2482 * @param nodeZK - the ZK watcher to expire 2483 * @param checkStatus - true to check if we can create a Table with the current configuration. 2484 */ 2485 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2486 Configuration c = new Configuration(this.conf); 2487 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2488 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2489 byte[] password = zk.getSessionPasswd(); 2490 long sessionID = zk.getSessionId(); 2491 2492 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2493 // so we create a first watcher to be sure that the 2494 // event was sent. We expect that if our watcher receives the event 2495 // other watchers on the same machine will get is as well. 2496 // When we ask to close the connection, ZK does not close it before 2497 // we receive all the events, so don't have to capture the event, just 2498 // closing the connection should be enough. 2499 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2500 @Override 2501 public void process(WatchedEvent watchedEvent) { 2502 LOG.info("Monitor ZKW received event=" + watchedEvent); 2503 } 2504 }, sessionID, password); 2505 2506 // Making it expire 2507 ZooKeeper newZK = 2508 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2509 2510 // ensure that we have connection to the server before closing down, otherwise 2511 // the close session event will be eaten out before we start CONNECTING state 2512 long start = EnvironmentEdgeManager.currentTime(); 2513 while ( 2514 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2515 ) { 2516 Thread.sleep(1); 2517 } 2518 newZK.close(); 2519 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2520 2521 // Now closing & waiting to be sure that the clients get it. 2522 monitor.close(); 2523 2524 if (checkStatus) { 2525 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2526 } 2527 } 2528 2529 /** 2530 * Get the Mini HBase cluster. 2531 * @return hbase cluster 2532 * @see #getHBaseClusterInterface() 2533 */ 2534 public SingleProcessHBaseCluster getHBaseCluster() { 2535 return getMiniHBaseCluster(); 2536 } 2537 2538 /** 2539 * Returns the HBaseCluster instance. 2540 * <p> 2541 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2542 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2543 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2544 * instead w/o the need to type-cast. 2545 */ 2546 public HBaseClusterInterface getHBaseClusterInterface() { 2547 // implementation note: we should rename this method as #getHBaseCluster(), 2548 // but this would require refactoring 90+ calls. 2549 return hbaseCluster; 2550 } 2551 2552 /** 2553 * Resets the connections so that the next time getConnection() is called, a new connection is 2554 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2555 * the connection is not valid anymore. 2556 * <p/> 2557 * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are 2558 * written, not all start() stop() calls go through this class. Most tests directly operate on the 2559 * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain 2560 * the connection state automatically. Cleaning this is a much bigger refactor. 2561 */ 2562 public void invalidateConnection() throws IOException { 2563 closeConnection(); 2564 // Update the master addresses if they changed. 2565 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2566 final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY); 2567 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2568 masterConfigBefore, masterConfAfter); 2569 conf.set(HConstants.MASTER_ADDRS_KEY, 2570 getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY)); 2571 } 2572 2573 /** 2574 * Get a shared Connection to the cluster. this method is thread safe. 2575 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2576 */ 2577 public Connection getConnection() throws IOException { 2578 return getAsyncConnection().toConnection(); 2579 } 2580 2581 /** 2582 * Get a assigned Connection to the cluster. this method is thread safe. 2583 * @param user assigned user 2584 * @return A Connection with assigned user. 2585 */ 2586 public Connection getConnection(User user) throws IOException { 2587 return getAsyncConnection(user).toConnection(); 2588 } 2589 2590 /** 2591 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2592 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2593 * of cluster. 2594 */ 2595 public AsyncClusterConnection getAsyncConnection() throws IOException { 2596 try { 2597 return asyncConnection.updateAndGet(connection -> { 2598 if (connection == null) { 2599 try { 2600 User user = UserProvider.instantiate(conf).getCurrent(); 2601 connection = getAsyncConnection(user); 2602 } catch (IOException ioe) { 2603 throw new UncheckedIOException("Failed to create connection", ioe); 2604 } 2605 } 2606 return connection; 2607 }); 2608 } catch (UncheckedIOException exception) { 2609 throw exception.getCause(); 2610 } 2611 } 2612 2613 /** 2614 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2615 * @param user assigned user 2616 * @return An AsyncClusterConnection with assigned user. 2617 */ 2618 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2619 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2620 } 2621 2622 public void closeConnection() throws IOException { 2623 if (hbaseAdmin != null) { 2624 Closeables.close(hbaseAdmin, true); 2625 hbaseAdmin = null; 2626 } 2627 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2628 if (asyncConnection != null) { 2629 Closeables.close(asyncConnection, true); 2630 } 2631 } 2632 2633 /** 2634 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2635 * it has no effect, it will be closed automatically when the cluster shutdowns 2636 */ 2637 public Admin getAdmin() throws IOException { 2638 if (hbaseAdmin == null) { 2639 this.hbaseAdmin = getConnection().getAdmin(); 2640 } 2641 return hbaseAdmin; 2642 } 2643 2644 private Admin hbaseAdmin = null; 2645 2646 /** 2647 * Returns an {@link Hbck} instance. Needs be closed when done. 2648 */ 2649 public Hbck getHbck() throws IOException { 2650 return getConnection().getHbck(); 2651 } 2652 2653 /** 2654 * Unassign the named region. 2655 * @param regionName The region to unassign. 2656 */ 2657 public void unassignRegion(String regionName) throws IOException { 2658 unassignRegion(Bytes.toBytes(regionName)); 2659 } 2660 2661 /** 2662 * Unassign the named region. 2663 * @param regionName The region to unassign. 2664 */ 2665 public void unassignRegion(byte[] regionName) throws IOException { 2666 getAdmin().unassign(regionName); 2667 } 2668 2669 /** 2670 * Closes the region containing the given row. 2671 * @param row The row to find the containing region. 2672 * @param table The table to find the region. 2673 */ 2674 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 2675 unassignRegionByRow(Bytes.toBytes(row), table); 2676 } 2677 2678 /** 2679 * Closes the region containing the given row. 2680 * @param row The row to find the containing region. 2681 * @param table The table to find the region. 2682 */ 2683 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 2684 HRegionLocation hrl = table.getRegionLocation(row); 2685 unassignRegion(hrl.getRegion().getRegionName()); 2686 } 2687 2688 /** 2689 * Retrieves a splittable region randomly from tableName 2690 * @param tableName name of table 2691 * @param maxAttempts maximum number of attempts, unlimited for value of -1 2692 * @return the HRegion chosen, null if none was found within limit of maxAttempts 2693 */ 2694 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 2695 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 2696 int regCount = regions.size(); 2697 Set<Integer> attempted = new HashSet<>(); 2698 int idx; 2699 int attempts = 0; 2700 do { 2701 regions = getHBaseCluster().getRegions(tableName); 2702 if (regCount != regions.size()) { 2703 // if there was region movement, clear attempted Set 2704 attempted.clear(); 2705 } 2706 regCount = regions.size(); 2707 // There are chances that before we get the region for the table from an RS the region may 2708 // be going for CLOSE. This may be because online schema change is enabled 2709 if (regCount > 0) { 2710 idx = ThreadLocalRandom.current().nextInt(regCount); 2711 // if we have just tried this region, there is no need to try again 2712 if (attempted.contains(idx)) { 2713 continue; 2714 } 2715 HRegion region = regions.get(idx); 2716 if (region.checkSplit().isPresent()) { 2717 return region; 2718 } 2719 attempted.add(idx); 2720 } 2721 attempts++; 2722 } while (maxAttempts == -1 || attempts < maxAttempts); 2723 return null; 2724 } 2725 2726 public MiniDFSCluster getDFSCluster() { 2727 return dfsCluster; 2728 } 2729 2730 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 2731 setDFSCluster(cluster, true); 2732 } 2733 2734 /** 2735 * Set the MiniDFSCluster 2736 * @param cluster cluster to use 2737 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 2738 * is set. 2739 * @throws IllegalStateException if the passed cluster is up when it is required to be down 2740 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 2741 */ 2742 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 2743 throws IllegalStateException, IOException { 2744 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 2745 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 2746 } 2747 this.dfsCluster = cluster; 2748 this.setFs(); 2749 } 2750 2751 public FileSystem getTestFileSystem() throws IOException { 2752 return HFileSystem.get(conf); 2753 } 2754 2755 /** 2756 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 2757 * (30 seconds). 2758 * @param table Table to wait on. 2759 */ 2760 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 2761 waitTableAvailable(table.getName(), 30000); 2762 } 2763 2764 public void waitTableAvailable(TableName table, long timeoutMillis) 2765 throws InterruptedException, IOException { 2766 waitFor(timeoutMillis, predicateTableAvailable(table)); 2767 } 2768 2769 /** 2770 * Wait until all regions in a table have been assigned 2771 * @param table Table to wait on. 2772 * @param timeoutMillis Timeout. 2773 */ 2774 public void waitTableAvailable(byte[] table, long timeoutMillis) 2775 throws InterruptedException, IOException { 2776 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 2777 } 2778 2779 public String explainTableAvailability(TableName tableName) throws IOException { 2780 StringBuilder msg = 2781 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 2782 if (getHBaseCluster().getMaster().isAlive()) { 2783 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 2784 .getRegionStates().getRegionAssignments(); 2785 final List<Pair<RegionInfo, ServerName>> metaLocations = 2786 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 2787 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 2788 RegionInfo hri = metaLocation.getFirst(); 2789 ServerName sn = metaLocation.getSecond(); 2790 if (!assignments.containsKey(hri)) { 2791 msg.append(", region ").append(hri) 2792 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 2793 } else if (sn == null) { 2794 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 2795 } else if (!sn.equals(assignments.get(hri))) { 2796 msg.append(", region ").append(hri) 2797 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 2798 .append(" <> ").append(assignments.get(hri)); 2799 } 2800 } 2801 } 2802 return msg.toString(); 2803 } 2804 2805 public String explainTableState(final TableName table, TableState.State state) 2806 throws IOException { 2807 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 2808 if (tableState == null) { 2809 return "TableState in META: No table state in META for table " + table 2810 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 2811 } else if (!tableState.inStates(state)) { 2812 return "TableState in META: Not " + state + " state, but " + tableState; 2813 } else { 2814 return "TableState in META: OK"; 2815 } 2816 } 2817 2818 @Nullable 2819 public TableState findLastTableState(final TableName table) throws IOException { 2820 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 2821 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 2822 @Override 2823 public boolean visit(Result r) throws IOException { 2824 if (!Arrays.equals(r.getRow(), table.getName())) { 2825 return false; 2826 } 2827 TableState state = CatalogFamilyFormat.getTableState(r); 2828 if (state != null) { 2829 lastTableState.set(state); 2830 } 2831 return true; 2832 } 2833 }; 2834 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 2835 Integer.MAX_VALUE, visitor); 2836 return lastTableState.get(); 2837 } 2838 2839 /** 2840 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2841 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 2842 * table. 2843 * @param table the table to wait on. 2844 * @throws InterruptedException if interrupted while waiting 2845 * @throws IOException if an IO problem is encountered 2846 */ 2847 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 2848 waitTableEnabled(table, 30000); 2849 } 2850 2851 /** 2852 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2853 * have been all assigned. 2854 * @see #waitTableEnabled(TableName, long) 2855 * @param table Table to wait on. 2856 * @param timeoutMillis Time to wait on it being marked enabled. 2857 */ 2858 public void waitTableEnabled(byte[] table, long timeoutMillis) 2859 throws InterruptedException, IOException { 2860 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 2861 } 2862 2863 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 2864 waitFor(timeoutMillis, predicateTableEnabled(table)); 2865 } 2866 2867 /** 2868 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 2869 * after default period (30 seconds) 2870 * @param table Table to wait on. 2871 */ 2872 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 2873 waitTableDisabled(table, 30000); 2874 } 2875 2876 public void waitTableDisabled(TableName table, long millisTimeout) 2877 throws InterruptedException, IOException { 2878 waitFor(millisTimeout, predicateTableDisabled(table)); 2879 } 2880 2881 /** 2882 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 2883 * @param table Table to wait on. 2884 * @param timeoutMillis Time to wait on it being marked disabled. 2885 */ 2886 public void waitTableDisabled(byte[] table, long timeoutMillis) 2887 throws InterruptedException, IOException { 2888 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 2889 } 2890 2891 /** 2892 * Make sure that at least the specified number of region servers are running 2893 * @param num minimum number of region servers that should be running 2894 * @return true if we started some servers 2895 */ 2896 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 2897 boolean startedServer = false; 2898 SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster(); 2899 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 2900 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 2901 startedServer = true; 2902 } 2903 2904 return startedServer; 2905 } 2906 2907 /** 2908 * Make sure that at least the specified number of region servers are running. We don't count the 2909 * ones that are currently stopping or are stopped. 2910 * @param num minimum number of region servers that should be running 2911 * @return true if we started some servers 2912 */ 2913 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 2914 boolean startedServer = ensureSomeRegionServersAvailable(num); 2915 2916 int nonStoppedServers = 0; 2917 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2918 2919 HRegionServer hrs = rst.getRegionServer(); 2920 if (hrs.isStopping() || hrs.isStopped()) { 2921 LOG.info("A region server is stopped or stopping:" + hrs); 2922 } else { 2923 nonStoppedServers++; 2924 } 2925 } 2926 for (int i = nonStoppedServers; i < num; ++i) { 2927 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 2928 startedServer = true; 2929 } 2930 return startedServer; 2931 } 2932 2933 /** 2934 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 2935 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 2936 * @param c Initial configuration 2937 * @param differentiatingSuffix Suffix to differentiate this user from others. 2938 * @return A new configuration instance with a different user set into it. 2939 */ 2940 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 2941 throws IOException { 2942 FileSystem currentfs = FileSystem.get(c); 2943 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 2944 return User.getCurrent(); 2945 } 2946 // Else distributed filesystem. Make a new instance per daemon. Below 2947 // code is taken from the AppendTestUtil over in hdfs. 2948 String username = User.getCurrent().getName() + differentiatingSuffix; 2949 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 2950 return user; 2951 } 2952 2953 public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster) 2954 throws IOException { 2955 NavigableSet<String> online = new TreeSet<>(); 2956 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 2957 try { 2958 for (RegionInfo region : ProtobufUtil 2959 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 2960 online.add(region.getRegionNameAsString()); 2961 } 2962 } catch (RegionServerStoppedException e) { 2963 // That's fine. 2964 } 2965 } 2966 return online; 2967 } 2968 2969 /** 2970 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 2971 * linger. Here is the exception you'll see: 2972 * 2973 * <pre> 2974 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 2975 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 2976 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 2977 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 2978 * </pre> 2979 * 2980 * @param stream A DFSClient.DFSOutputStream. 2981 */ 2982 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 2983 try { 2984 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 2985 for (Class<?> clazz : clazzes) { 2986 String className = clazz.getSimpleName(); 2987 if (className.equals("DFSOutputStream")) { 2988 if (clazz.isInstance(stream)) { 2989 Field maxRecoveryErrorCountField = 2990 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 2991 maxRecoveryErrorCountField.setAccessible(true); 2992 maxRecoveryErrorCountField.setInt(stream, max); 2993 break; 2994 } 2995 } 2996 } 2997 } catch (Exception e) { 2998 LOG.info("Could not set max recovery field", e); 2999 } 3000 } 3001 3002 /** 3003 * Uses directly the assignment manager to assign the region. and waits until the specified region 3004 * has completed assignment. 3005 * @return true if the region is assigned false otherwise. 3006 */ 3007 public boolean assignRegion(final RegionInfo regionInfo) 3008 throws IOException, InterruptedException { 3009 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 3010 am.assign(regionInfo); 3011 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 3012 } 3013 3014 /** 3015 * Move region to destination server and wait till region is completely moved and online 3016 * @param destRegion region to move 3017 * @param destServer destination server of the region 3018 */ 3019 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3020 throws InterruptedException, IOException { 3021 HMaster master = getMiniHBaseCluster().getMaster(); 3022 // TODO: Here we start the move. The move can take a while. 3023 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3024 while (true) { 3025 ServerName serverName = 3026 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3027 if (serverName != null && serverName.equals(destServer)) { 3028 assertRegionOnServer(destRegion, serverName, 2000); 3029 break; 3030 } 3031 Thread.sleep(10); 3032 } 3033 } 3034 3035 /** 3036 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3037 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3038 * master has been informed and updated hbase:meta with the regions deployed server. 3039 * @param tableName the table name 3040 */ 3041 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3042 waitUntilAllRegionsAssigned(tableName, 3043 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3044 } 3045 3046 /** 3047 * Waith until all system table's regions get assigned 3048 */ 3049 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3050 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3051 } 3052 3053 /** 3054 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3055 * timeout. This means all regions have been deployed, master has been informed and updated 3056 * hbase:meta with the regions deployed server. 3057 * @param tableName the table name 3058 * @param timeout timeout, in milliseconds 3059 */ 3060 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3061 throws IOException { 3062 if (!TableName.isMetaTableName(tableName)) { 3063 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3064 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3065 + timeout + "ms"); 3066 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3067 @Override 3068 public String explainFailure() throws IOException { 3069 return explainTableAvailability(tableName); 3070 } 3071 3072 @Override 3073 public boolean evaluate() throws IOException { 3074 Scan scan = new Scan(); 3075 scan.addFamily(HConstants.CATALOG_FAMILY); 3076 boolean tableFound = false; 3077 try (ResultScanner s = meta.getScanner(scan)) { 3078 for (Result r; (r = s.next()) != null;) { 3079 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3080 RegionInfo info = RegionInfo.parseFromOrNull(b); 3081 if (info != null && info.getTable().equals(tableName)) { 3082 // Get server hosting this region from catalog family. Return false if no server 3083 // hosting this region, or if the server hosting this region was recently killed 3084 // (for fault tolerance testing). 3085 tableFound = true; 3086 byte[] server = 3087 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3088 if (server == null) { 3089 return false; 3090 } else { 3091 byte[] startCode = 3092 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3093 ServerName serverName = 3094 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3095 + Bytes.toLong(startCode)); 3096 if ( 3097 !getHBaseClusterInterface().isDistributedCluster() 3098 && getHBaseCluster().isKilledRS(serverName) 3099 ) { 3100 return false; 3101 } 3102 } 3103 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3104 return false; 3105 } 3106 } 3107 } 3108 } 3109 if (!tableFound) { 3110 LOG.warn( 3111 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3112 } 3113 return tableFound; 3114 } 3115 }); 3116 } 3117 } 3118 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3119 // check from the master state if we are using a mini cluster 3120 if (!getHBaseClusterInterface().isDistributedCluster()) { 3121 // So, all regions are in the meta table but make sure master knows of the assignments before 3122 // returning -- sometimes this can lag. 3123 HMaster master = getHBaseCluster().getMaster(); 3124 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3125 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3126 @Override 3127 public String explainFailure() throws IOException { 3128 return explainTableAvailability(tableName); 3129 } 3130 3131 @Override 3132 public boolean evaluate() throws IOException { 3133 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3134 return hris != null && !hris.isEmpty(); 3135 } 3136 }); 3137 } 3138 LOG.info("All regions for table " + tableName + " assigned."); 3139 } 3140 3141 /** 3142 * Do a small get/scan against one store. This is required because store has no actual methods of 3143 * querying itself, and relies on StoreScanner. 3144 */ 3145 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3146 Scan scan = new Scan(get); 3147 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3148 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3149 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3150 // readpoint 0. 3151 0); 3152 3153 List<Cell> result = new ArrayList<>(); 3154 scanner.next(result); 3155 if (!result.isEmpty()) { 3156 // verify that we are on the row we want: 3157 Cell kv = result.get(0); 3158 if (!CellUtil.matchingRows(kv, get.getRow())) { 3159 result.clear(); 3160 } 3161 } 3162 scanner.close(); 3163 return result; 3164 } 3165 3166 /** 3167 * Create region split keys between startkey and endKey 3168 * @param numRegions the number of regions to be created. it has to be greater than 3. 3169 * @return resulting split keys 3170 */ 3171 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3172 assertTrue(numRegions > 3); 3173 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3174 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3175 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3176 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3177 return result; 3178 } 3179 3180 /** 3181 * Do a small get/scan against one store. This is required because store has no actual methods of 3182 * querying itself, and relies on StoreScanner. 3183 */ 3184 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3185 throws IOException { 3186 Get get = new Get(row); 3187 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3188 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3189 3190 return getFromStoreFile(store, get); 3191 } 3192 3193 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3194 final List<? extends Cell> actual) { 3195 final int eLen = expected.size(); 3196 final int aLen = actual.size(); 3197 final int minLen = Math.min(eLen, aLen); 3198 3199 int i = 0; 3200 while ( 3201 i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0 3202 ) { 3203 i++; 3204 } 3205 3206 if (additionalMsg == null) { 3207 additionalMsg = ""; 3208 } 3209 if (!additionalMsg.isEmpty()) { 3210 additionalMsg = ". " + additionalMsg; 3211 } 3212 3213 if (eLen != aLen || i != minLen) { 3214 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3215 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3216 + " (length " + aLen + ")" + additionalMsg); 3217 } 3218 } 3219 3220 public static <T> String safeGetAsStr(List<T> lst, int i) { 3221 if (0 <= i && i < lst.size()) { 3222 return lst.get(i).toString(); 3223 } else { 3224 return "<out_of_range>"; 3225 } 3226 } 3227 3228 public String getRpcConnnectionURI() throws UnknownHostException { 3229 return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); 3230 } 3231 3232 public String getZkConnectionURI() { 3233 return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 3234 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3235 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3236 } 3237 3238 /** 3239 * Get the zk based cluster key for this cluster. 3240 * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the 3241 * connection info of a cluster. Keep here only for compatibility. 3242 * @see #getRpcConnnectionURI() 3243 * @see #getZkConnectionURI() 3244 */ 3245 @Deprecated 3246 public String getClusterKey() { 3247 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3248 + ":" 3249 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3250 } 3251 3252 /** 3253 * Creates a random table with the given parameters 3254 */ 3255 public Table createRandomTable(TableName tableName, final Collection<String> families, 3256 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3257 final int numRowsPerFlush) throws IOException, InterruptedException { 3258 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3259 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3260 + maxVersions + "\n"); 3261 3262 final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L); 3263 final int numCF = families.size(); 3264 final byte[][] cfBytes = new byte[numCF][]; 3265 { 3266 int cfIndex = 0; 3267 for (String cf : families) { 3268 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3269 } 3270 } 3271 3272 final int actualStartKey = 0; 3273 final int actualEndKey = Integer.MAX_VALUE; 3274 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3275 final int splitStartKey = actualStartKey + keysPerRegion; 3276 final int splitEndKey = actualEndKey - keysPerRegion; 3277 final String keyFormat = "%08x"; 3278 final Table table = createTable(tableName, cfBytes, maxVersions, 3279 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3280 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3281 3282 if (hbaseCluster != null) { 3283 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3284 } 3285 3286 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3287 3288 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3289 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3290 final byte[] row = Bytes.toBytes( 3291 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3292 3293 Put put = new Put(row); 3294 Delete del = new Delete(row); 3295 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3296 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3297 final long ts = rand.nextInt(); 3298 final byte[] qual = Bytes.toBytes("col" + iCol); 3299 if (rand.nextBoolean()) { 3300 final byte[] value = 3301 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3302 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3303 put.addColumn(cf, qual, ts, value); 3304 } else if (rand.nextDouble() < 0.8) { 3305 del.addColumn(cf, qual, ts); 3306 } else { 3307 del.addColumns(cf, qual, ts); 3308 } 3309 } 3310 3311 if (!put.isEmpty()) { 3312 mutator.mutate(put); 3313 } 3314 3315 if (!del.isEmpty()) { 3316 mutator.mutate(del); 3317 } 3318 } 3319 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3320 mutator.flush(); 3321 if (hbaseCluster != null) { 3322 getMiniHBaseCluster().flushcache(table.getName()); 3323 } 3324 } 3325 mutator.close(); 3326 3327 return table; 3328 } 3329 3330 public static int randomFreePort() { 3331 return HBaseCommonTestingUtil.randomFreePort(); 3332 } 3333 3334 public static String randomMultiCastAddress() { 3335 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3336 } 3337 3338 public static void waitForHostPort(String host, int port) throws IOException { 3339 final int maxTimeMs = 10000; 3340 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3341 IOException savedException = null; 3342 LOG.info("Waiting for server at " + host + ":" + port); 3343 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3344 try { 3345 Socket sock = new Socket(InetAddress.getByName(host), port); 3346 sock.close(); 3347 savedException = null; 3348 LOG.info("Server at " + host + ":" + port + " is available"); 3349 break; 3350 } catch (UnknownHostException e) { 3351 throw new IOException("Failed to look up " + host, e); 3352 } catch (IOException e) { 3353 savedException = e; 3354 } 3355 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3356 } 3357 3358 if (savedException != null) { 3359 throw savedException; 3360 } 3361 } 3362 3363 public static int getMetaRSPort(Connection connection) throws IOException { 3364 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3365 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3366 } 3367 } 3368 3369 /** 3370 * Due to async racing issue, a region may not be in the online region list of a region server 3371 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3372 */ 3373 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3374 final long timeout) throws IOException, InterruptedException { 3375 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3376 while (true) { 3377 List<RegionInfo> regions = getAdmin().getRegions(server); 3378 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3379 long now = EnvironmentEdgeManager.currentTime(); 3380 if (now > timeoutTime) break; 3381 Thread.sleep(10); 3382 } 3383 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3384 } 3385 3386 /** 3387 * Check to make sure the region is open on the specified region server, but not on any other one. 3388 */ 3389 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3390 final long timeout) throws IOException, InterruptedException { 3391 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3392 while (true) { 3393 List<RegionInfo> regions = getAdmin().getRegions(server); 3394 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3395 List<JVMClusterUtil.RegionServerThread> rsThreads = 3396 getHBaseCluster().getLiveRegionServerThreads(); 3397 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3398 HRegionServer rs = rsThread.getRegionServer(); 3399 if (server.equals(rs.getServerName())) { 3400 continue; 3401 } 3402 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3403 for (HRegion r : hrs) { 3404 assertTrue("Region should not be double assigned", 3405 r.getRegionInfo().getRegionId() != hri.getRegionId()); 3406 } 3407 } 3408 return; // good, we are happy 3409 } 3410 long now = EnvironmentEdgeManager.currentTime(); 3411 if (now > timeoutTime) break; 3412 Thread.sleep(10); 3413 } 3414 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3415 } 3416 3417 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3418 TableDescriptor td = 3419 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3420 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3421 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3422 } 3423 3424 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3425 BlockCache blockCache) throws IOException { 3426 TableDescriptor td = 3427 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3428 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3429 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3430 } 3431 3432 public static void setFileSystemURI(String fsURI) { 3433 FS_URI = fsURI; 3434 } 3435 3436 /** 3437 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3438 */ 3439 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3440 return new ExplainingPredicate<IOException>() { 3441 @Override 3442 public String explainFailure() throws IOException { 3443 final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager(); 3444 return "found in transition: " + am.getRegionsInTransition().toString(); 3445 } 3446 3447 @Override 3448 public boolean evaluate() throws IOException { 3449 HMaster master = getMiniHBaseCluster().getMaster(); 3450 if (master == null) return false; 3451 AssignmentManager am = master.getAssignmentManager(); 3452 if (am == null) return false; 3453 return !am.hasRegionsInTransition(); 3454 } 3455 }; 3456 } 3457 3458 /** 3459 * Returns a {@link Predicate} for checking that there are no procedure to region transition in 3460 * master 3461 */ 3462 public ExplainingPredicate<IOException> predicateNoRegionTransitScheduled() { 3463 return new ExplainingPredicate<IOException>() { 3464 @Override 3465 public String explainFailure() throws IOException { 3466 final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager(); 3467 return "Number of procedure scheduled for region transit: " 3468 + am.getRegionTransitScheduledCount(); 3469 } 3470 3471 @Override 3472 public boolean evaluate() throws IOException { 3473 HMaster master = getMiniHBaseCluster().getMaster(); 3474 if (master == null) { 3475 return false; 3476 } 3477 AssignmentManager am = master.getAssignmentManager(); 3478 if (am == null) { 3479 return false; 3480 } 3481 return am.getRegionTransitScheduledCount() == 0; 3482 } 3483 }; 3484 } 3485 3486 /** 3487 * Returns a {@link Predicate} for checking that table is enabled 3488 */ 3489 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3490 return new ExplainingPredicate<IOException>() { 3491 @Override 3492 public String explainFailure() throws IOException { 3493 return explainTableState(tableName, TableState.State.ENABLED); 3494 } 3495 3496 @Override 3497 public boolean evaluate() throws IOException { 3498 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3499 } 3500 }; 3501 } 3502 3503 /** 3504 * Returns a {@link Predicate} for checking that table is enabled 3505 */ 3506 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3507 return new ExplainingPredicate<IOException>() { 3508 @Override 3509 public String explainFailure() throws IOException { 3510 return explainTableState(tableName, TableState.State.DISABLED); 3511 } 3512 3513 @Override 3514 public boolean evaluate() throws IOException { 3515 return getAdmin().isTableDisabled(tableName); 3516 } 3517 }; 3518 } 3519 3520 /** 3521 * Returns a {@link Predicate} for checking that table is enabled 3522 */ 3523 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3524 return new ExplainingPredicate<IOException>() { 3525 @Override 3526 public String explainFailure() throws IOException { 3527 return explainTableAvailability(tableName); 3528 } 3529 3530 @Override 3531 public boolean evaluate() throws IOException { 3532 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3533 if (tableAvailable) { 3534 try (Table table = getConnection().getTable(tableName)) { 3535 TableDescriptor htd = table.getDescriptor(); 3536 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3537 .getAllRegionLocations()) { 3538 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3539 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3540 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3541 for (byte[] family : htd.getColumnFamilyNames()) { 3542 scan.addFamily(family); 3543 } 3544 try (ResultScanner scanner = table.getScanner(scan)) { 3545 scanner.next(); 3546 } 3547 } 3548 } 3549 } 3550 return tableAvailable; 3551 } 3552 }; 3553 } 3554 3555 /** 3556 * Wait until no regions in transition. 3557 * @param timeout How long to wait. 3558 */ 3559 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3560 waitFor(timeout, predicateNoRegionsInTransition()); 3561 } 3562 3563 /** 3564 * Wait until no regions in transition. 3565 * @param timeout How long to wait. 3566 */ 3567 public void waitUntilNoRegionTransitScheduled(final long timeout) throws IOException { 3568 waitFor(timeout, predicateNoRegionTransitScheduled()); 3569 } 3570 3571 /** 3572 * Wait until no regions in transition. (time limit 15min) 3573 */ 3574 public void waitUntilNoRegionsInTransition() throws IOException { 3575 waitUntilNoRegionsInTransition(15 * 60000); 3576 } 3577 3578 /** 3579 * Wait until no TRSP is present 3580 */ 3581 public void waitUntilNoRegionTransitScheduled() throws IOException { 3582 waitUntilNoRegionTransitScheduled(15 * 60000); 3583 } 3584 3585 /** 3586 * Wait until labels is ready in VisibilityLabelsCache. 3587 */ 3588 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 3589 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 3590 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 3591 3592 @Override 3593 public boolean evaluate() { 3594 for (String label : labels) { 3595 if (labelsCache.getLabelOrdinal(label) == 0) { 3596 return false; 3597 } 3598 } 3599 return true; 3600 } 3601 3602 @Override 3603 public String explainFailure() { 3604 for (String label : labels) { 3605 if (labelsCache.getLabelOrdinal(label) == 0) { 3606 return label + " is not available yet"; 3607 } 3608 } 3609 return ""; 3610 } 3611 }); 3612 } 3613 3614 /** 3615 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3616 * available. 3617 * @return the list of column descriptors 3618 */ 3619 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 3620 return generateColumnDescriptors(""); 3621 } 3622 3623 /** 3624 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3625 * available. 3626 * @param prefix family names prefix 3627 * @return the list of column descriptors 3628 */ 3629 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 3630 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 3631 long familyId = 0; 3632 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 3633 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 3634 for (BloomType bloomType : BloomType.values()) { 3635 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 3636 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 3637 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 3638 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 3639 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 3640 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 3641 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 3642 familyId++; 3643 } 3644 } 3645 } 3646 return columnFamilyDescriptors; 3647 } 3648 3649 /** 3650 * Get supported compression algorithms. 3651 * @return supported compression algorithms. 3652 */ 3653 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 3654 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 3655 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 3656 for (String algoName : allAlgos) { 3657 try { 3658 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 3659 algo.getCompressor(); 3660 supportedAlgos.add(algo); 3661 } catch (Throwable t) { 3662 // this algo is not available 3663 } 3664 } 3665 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 3666 } 3667 3668 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 3669 Scan scan = new Scan().withStartRow(row); 3670 scan.setReadType(ReadType.PREAD); 3671 scan.setCaching(1); 3672 scan.setReversed(true); 3673 scan.addFamily(family); 3674 try (RegionScanner scanner = r.getScanner(scan)) { 3675 List<Cell> cells = new ArrayList<>(1); 3676 scanner.next(cells); 3677 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 3678 return null; 3679 } 3680 return Result.create(cells); 3681 } 3682 } 3683 3684 private boolean isTargetTable(final byte[] inRow, Cell c) { 3685 String inputRowString = Bytes.toString(inRow); 3686 int i = inputRowString.indexOf(HConstants.DELIMITER); 3687 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 3688 int o = outputRowString.indexOf(HConstants.DELIMITER); 3689 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 3690 } 3691 3692 /** 3693 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 3694 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 3695 * kerby KDC server and utility for using it, 3696 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 3697 * less baggage. It came in in HBASE-5291. 3698 */ 3699 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 3700 Properties conf = MiniKdc.createConf(); 3701 conf.put(MiniKdc.DEBUG, true); 3702 MiniKdc kdc = null; 3703 File dir = null; 3704 // There is time lag between selecting a port and trying to bind with it. It's possible that 3705 // another service captures the port in between which'll result in BindException. 3706 boolean bindException; 3707 int numTries = 0; 3708 do { 3709 try { 3710 bindException = false; 3711 dir = new File(getDataTestDir("kdc").toUri().getPath()); 3712 kdc = new MiniKdc(conf, dir); 3713 kdc.start(); 3714 } catch (BindException e) { 3715 FileUtils.deleteDirectory(dir); // clean directory 3716 numTries++; 3717 if (numTries == 3) { 3718 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 3719 throw e; 3720 } 3721 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 3722 bindException = true; 3723 } 3724 } while (bindException); 3725 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 3726 return kdc; 3727 } 3728 3729 public int getNumHFiles(final TableName tableName, final byte[] family) { 3730 int numHFiles = 0; 3731 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 3732 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 3733 } 3734 return numHFiles; 3735 } 3736 3737 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 3738 final byte[] family) { 3739 int numHFiles = 0; 3740 for (Region region : rs.getRegions(tableName)) { 3741 numHFiles += region.getStore(family).getStorefilesCount(); 3742 } 3743 return numHFiles; 3744 } 3745 3746 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 3747 assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode()); 3748 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 3749 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 3750 assertEquals(ltdFamilies.size(), rtdFamilies.size()); 3751 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 3752 it2 = rtdFamilies.iterator(); it.hasNext();) { 3753 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 3754 } 3755 } 3756 3757 /** 3758 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 3759 * invocations. 3760 */ 3761 public static void await(final long sleepMillis, final BooleanSupplier condition) 3762 throws InterruptedException { 3763 try { 3764 while (!condition.getAsBoolean()) { 3765 Thread.sleep(sleepMillis); 3766 } 3767 } catch (RuntimeException e) { 3768 if (e.getCause() instanceof AssertionError) { 3769 throw (AssertionError) e.getCause(); 3770 } 3771 throw e; 3772 } 3773 } 3774 3775 public void createRegionDir(RegionInfo hri) throws IOException { 3776 Path rootDir = getDataTestDir(); 3777 Path tableDir = CommonFSUtils.getTableDir(rootDir, hri.getTable()); 3778 Path regionDir = new Path(tableDir, hri.getEncodedName()); 3779 FileSystem fs = getTestFileSystem(); 3780 if (!fs.exists(regionDir)) { 3781 fs.mkdirs(regionDir); 3782 } 3783 } 3784 3785 public void createRegionDir(RegionInfo regionInfo, MasterFileSystem masterFileSystem) 3786 throws IOException { 3787 Path tableDir = 3788 CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), regionInfo.getTable()); 3789 HRegionFileSystem.createRegionOnFileSystem(conf, masterFileSystem.getFileSystem(), tableDir, 3790 regionInfo); 3791 } 3792}