001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.Closeable; 026import java.io.File; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.io.UncheckedIOException; 030import java.lang.reflect.Field; 031import java.net.BindException; 032import java.net.DatagramSocket; 033import java.net.InetAddress; 034import java.net.ServerSocket; 035import java.net.Socket; 036import java.net.UnknownHostException; 037import java.nio.charset.StandardCharsets; 038import java.security.MessageDigest; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.HashSet; 044import java.util.Iterator; 045import java.util.List; 046import java.util.Map; 047import java.util.NavigableSet; 048import java.util.Properties; 049import java.util.Random; 050import java.util.Set; 051import java.util.TreeSet; 052import java.util.concurrent.ExecutionException; 053import java.util.concurrent.ThreadLocalRandom; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicReference; 056import java.util.function.BooleanSupplier; 057import org.apache.commons.io.FileUtils; 058import org.apache.commons.lang3.RandomStringUtils; 059import org.apache.hadoop.conf.Configuration; 060import org.apache.hadoop.fs.FileSystem; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 063import org.apache.hadoop.hbase.Waiter.Predicate; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.AsyncAdmin; 066import org.apache.hadoop.hbase.client.AsyncClusterConnection; 067import org.apache.hadoop.hbase.client.BufferedMutator; 068import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.Connection; 072import org.apache.hadoop.hbase.client.ConnectionFactory; 073import org.apache.hadoop.hbase.client.Consistency; 074import org.apache.hadoop.hbase.client.Delete; 075import org.apache.hadoop.hbase.client.Durability; 076import org.apache.hadoop.hbase.client.Get; 077import org.apache.hadoop.hbase.client.Hbck; 078import org.apache.hadoop.hbase.client.MasterRegistry; 079import org.apache.hadoop.hbase.client.Put; 080import org.apache.hadoop.hbase.client.RegionInfo; 081import org.apache.hadoop.hbase.client.RegionInfoBuilder; 082import org.apache.hadoop.hbase.client.RegionLocator; 083import org.apache.hadoop.hbase.client.Result; 084import org.apache.hadoop.hbase.client.ResultScanner; 085import org.apache.hadoop.hbase.client.Scan; 086import org.apache.hadoop.hbase.client.Scan.ReadType; 087import org.apache.hadoop.hbase.client.Table; 088import org.apache.hadoop.hbase.client.TableDescriptor; 089import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 090import org.apache.hadoop.hbase.client.TableState; 091import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 092import org.apache.hadoop.hbase.fs.HFileSystem; 093import org.apache.hadoop.hbase.io.compress.Compression; 094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 096import org.apache.hadoop.hbase.io.hfile.BlockCache; 097import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 098import org.apache.hadoop.hbase.io.hfile.HFile; 099import org.apache.hadoop.hbase.ipc.RpcServerInterface; 100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 101import org.apache.hadoop.hbase.master.HMaster; 102import org.apache.hadoop.hbase.master.RegionState; 103import org.apache.hadoop.hbase.master.ServerManager; 104import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 105import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 106import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 107import org.apache.hadoop.hbase.master.assignment.RegionStates; 108import org.apache.hadoop.hbase.mob.MobFileCache; 109import org.apache.hadoop.hbase.regionserver.BloomType; 110import org.apache.hadoop.hbase.regionserver.ChunkCreator; 111import org.apache.hadoop.hbase.regionserver.HRegion; 112import org.apache.hadoop.hbase.regionserver.HRegionServer; 113import org.apache.hadoop.hbase.regionserver.HStore; 114import org.apache.hadoop.hbase.regionserver.InternalScanner; 115import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 116import org.apache.hadoop.hbase.regionserver.Region; 117import org.apache.hadoop.hbase.regionserver.RegionScanner; 118import org.apache.hadoop.hbase.regionserver.RegionServerServices; 119import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 120import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 121import org.apache.hadoop.hbase.security.User; 122import org.apache.hadoop.hbase.security.UserProvider; 123import org.apache.hadoop.hbase.security.access.AccessController; 124import org.apache.hadoop.hbase.security.access.PermissionStorage; 125import org.apache.hadoop.hbase.security.access.SecureTestUtil; 126import org.apache.hadoop.hbase.security.token.TokenProvider; 127import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 128import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 129import org.apache.hadoop.hbase.util.Bytes; 130import org.apache.hadoop.hbase.util.CommonFSUtils; 131import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 132import org.apache.hadoop.hbase.util.FSUtils; 133import org.apache.hadoop.hbase.util.JVM; 134import org.apache.hadoop.hbase.util.JVMClusterUtil; 135import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 136import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 137import org.apache.hadoop.hbase.util.Pair; 138import org.apache.hadoop.hbase.util.RetryCounter; 139import org.apache.hadoop.hbase.util.Threads; 140import org.apache.hadoop.hbase.wal.WAL; 141import org.apache.hadoop.hbase.wal.WALFactory; 142import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 143import org.apache.hadoop.hbase.zookeeper.ZKConfig; 144import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 145import org.apache.hadoop.hdfs.DFSClient; 146import org.apache.hadoop.hdfs.DistributedFileSystem; 147import org.apache.hadoop.hdfs.MiniDFSCluster; 148import org.apache.hadoop.hdfs.server.datanode.DataNode; 149import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 150import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 151import org.apache.hadoop.mapred.JobConf; 152import org.apache.hadoop.mapred.MiniMRCluster; 153import org.apache.hadoop.minikdc.MiniKdc; 154import org.apache.yetus.audience.InterfaceAudience; 155import org.apache.yetus.audience.InterfaceStability; 156import org.apache.zookeeper.WatchedEvent; 157import org.apache.zookeeper.ZooKeeper; 158import org.apache.zookeeper.ZooKeeper.States; 159 160import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 161 162import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 163 164/** 165 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 166 * functionality. Create an instance and keep it around testing HBase. 167 * <p/> 168 * This class is meant to be your one-stop shop for anything you might need testing. Manages one 169 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster}, 170 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real 171 * cluster. 172 * <p/> 173 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration. 174 * <p/> 175 * It does not set logging levels. 176 * <p/> 177 * In the configuration properties, default values for master-info-port and region-server-port are 178 * overridden such that a random port will be assigned (thus avoiding port contention if another 179 * local HBase instance is already running). 180 * <p/> 181 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 182 * setting it to true. 183 */ 184@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 185@InterfaceStability.Evolving 186public class HBaseTestingUtil extends HBaseZKTestingUtil { 187 188 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 189 190 private MiniDFSCluster dfsCluster = null; 191 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 192 193 private volatile HBaseClusterInterface hbaseCluster = null; 194 private MiniMRCluster mrCluster = null; 195 196 /** If there is a mini cluster running for this testing utility instance. */ 197 private volatile boolean miniClusterRunning; 198 199 private String hadoopLogDir; 200 201 /** 202 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 203 */ 204 private Path dataTestDirOnTestFS = null; 205 206 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 207 208 /** Filesystem URI used for map-reduce mini-cluster setup */ 209 private static String FS_URI; 210 211 /** This is for unit tests parameterized with a single boolean. */ 212 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 213 214 /** 215 * Checks to see if a specific port is available. 216 * @param port the port number to check for availability 217 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 218 */ 219 public static boolean available(int port) { 220 ServerSocket ss = null; 221 DatagramSocket ds = null; 222 try { 223 ss = new ServerSocket(port); 224 ss.setReuseAddress(true); 225 ds = new DatagramSocket(port); 226 ds.setReuseAddress(true); 227 return true; 228 } catch (IOException e) { 229 // Do nothing 230 } finally { 231 if (ds != null) { 232 ds.close(); 233 } 234 235 if (ss != null) { 236 try { 237 ss.close(); 238 } catch (IOException e) { 239 /* should not be thrown */ 240 } 241 } 242 } 243 244 return false; 245 } 246 247 /** 248 * Create all combinations of Bloom filters and compression algorithms for testing. 249 */ 250 private static List<Object[]> bloomAndCompressionCombinations() { 251 List<Object[]> configurations = new ArrayList<>(); 252 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 253 for (BloomType bloomType : BloomType.values()) { 254 configurations.add(new Object[] { comprAlgo, bloomType }); 255 } 256 } 257 return Collections.unmodifiableList(configurations); 258 } 259 260 /** 261 * Create combination of memstoreTS and tags 262 */ 263 private static List<Object[]> memStoreTSAndTagsCombination() { 264 List<Object[]> configurations = new ArrayList<>(); 265 configurations.add(new Object[] { false, false }); 266 configurations.add(new Object[] { false, true }); 267 configurations.add(new Object[] { true, false }); 268 configurations.add(new Object[] { true, true }); 269 return Collections.unmodifiableList(configurations); 270 } 271 272 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 273 List<Object[]> configurations = new ArrayList<>(); 274 configurations.add(new Object[] { false, false, true }); 275 configurations.add(new Object[] { false, false, false }); 276 configurations.add(new Object[] { false, true, true }); 277 configurations.add(new Object[] { false, true, false }); 278 configurations.add(new Object[] { true, false, true }); 279 configurations.add(new Object[] { true, false, false }); 280 configurations.add(new Object[] { true, true, true }); 281 configurations.add(new Object[] { true, true, false }); 282 return Collections.unmodifiableList(configurations); 283 } 284 285 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 286 bloomAndCompressionCombinations(); 287 288 /** 289 * <p> 290 * Create an HBaseTestingUtility using a default configuration. 291 * <p> 292 * Initially, all tmp files are written to a local test data directory. Once 293 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 294 * data will be written to the DFS directory instead. 295 */ 296 public HBaseTestingUtil() { 297 this(HBaseConfiguration.create()); 298 } 299 300 /** 301 * <p> 302 * Create an HBaseTestingUtility using a given configuration. 303 * <p> 304 * Initially, all tmp files are written to a local test data directory. Once 305 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 306 * data will be written to the DFS directory instead. 307 * @param conf The configuration to use for further operations 308 */ 309 public HBaseTestingUtil(@Nullable Configuration conf) { 310 super(conf); 311 312 // a hbase checksum verification failure will cause unit tests to fail 313 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 314 315 // Save this for when setting default file:// breaks things 316 if (this.conf.get("fs.defaultFS") != null) { 317 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 318 } 319 if (this.conf.get(HConstants.HBASE_DIR) != null) { 320 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 321 } 322 // Every cluster is a local cluster until we start DFS 323 // Note that conf could be null, but this.conf will not be 324 String dataTestDir = getDataTestDir().toString(); 325 this.conf.set("fs.defaultFS", "file:///"); 326 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 327 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 328 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 329 // If the value for random ports isn't set set it to true, thus making 330 // tests opt-out for random port assignment 331 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 332 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 333 } 334 335 /** 336 * Close both the region {@code r} and it's underlying WAL. For use in tests. 337 */ 338 public static void closeRegionAndWAL(final Region r) throws IOException { 339 closeRegionAndWAL((HRegion) r); 340 } 341 342 /** 343 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 344 */ 345 public static void closeRegionAndWAL(final HRegion r) throws IOException { 346 if (r == null) return; 347 r.close(); 348 if (r.getWAL() == null) return; 349 r.getWAL().close(); 350 } 351 352 /** 353 * Start mini secure cluster with given kdc and principals. 354 * @param kdc Mini kdc server 355 * @param servicePrincipal Service principal without realm. 356 * @param spnegoPrincipal Spnego principal without realm. 357 * @return Handler to shutdown the cluster 358 */ 359 public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal, 360 String spnegoPrincipal) throws Exception { 361 Configuration conf = getConfiguration(); 362 363 SecureTestUtil.enableSecurity(conf); 364 VisibilityTestUtil.enableVisiblityLabels(conf); 365 SecureTestUtil.verifyConfiguration(conf); 366 367 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 368 AccessController.class.getName() + ',' + TokenProvider.class.getName()); 369 370 HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(), 371 spnegoPrincipal + '@' + kdc.getRealm()); 372 373 startMiniCluster(); 374 try { 375 waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); 376 } catch (Exception e) { 377 shutdownMiniCluster(); 378 throw e; 379 } 380 381 return this::shutdownMiniCluster; 382 } 383 384 /** 385 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 386 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 387 * by the Configuration. If say, a Connection was being used against a cluster that had been 388 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 389 * Rather than use the return direct, its usually best to make a copy and use that. Do 390 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 391 * @return Instance of Configuration. 392 */ 393 @Override 394 public Configuration getConfiguration() { 395 return super.getConfiguration(); 396 } 397 398 public void setHBaseCluster(HBaseClusterInterface hbaseCluster) { 399 this.hbaseCluster = hbaseCluster; 400 } 401 402 /** 403 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 404 * have many concurrent tests running if we need to. Moding a System property is not the way to do 405 * concurrent instances -- another instance could grab the temporary value unintentionally -- but 406 * not anything can do about it at moment; single instance only is how the minidfscluster works. 407 * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir 408 * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir 409 * (We do not create them!). 410 * @return The calculated data test build directory, if newly-created. 411 */ 412 @Override 413 protected Path setupDataTestDir() { 414 Path testPath = super.setupDataTestDir(); 415 if (null == testPath) { 416 return null; 417 } 418 419 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 420 421 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 422 // we want our own value to ensure uniqueness on the same machine 423 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 424 425 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 426 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 427 return testPath; 428 } 429 430 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 431 432 String sysValue = System.getProperty(propertyName); 433 434 if (sysValue != null) { 435 // There is already a value set. So we do nothing but hope 436 // that there will be no conflicts 437 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 438 + " so I do NOT create it in " + parent); 439 String confValue = conf.get(propertyName); 440 if (confValue != null && !confValue.endsWith(sysValue)) { 441 LOG.warn(propertyName + " property value differs in configuration and system: " 442 + "Configuration=" + confValue + " while System=" + sysValue 443 + " Erasing configuration value by system value."); 444 } 445 conf.set(propertyName, sysValue); 446 } else { 447 // Ok, it's not set, so we create it as a subdirectory 448 createSubDir(propertyName, parent, subDirName); 449 System.setProperty(propertyName, conf.get(propertyName)); 450 } 451 } 452 453 /** 454 * @return Where to write test data on the test filesystem; Returns working directory for the test 455 * filesystem by default 456 * @see #setupDataTestDirOnTestFS() 457 * @see #getTestFileSystem() 458 */ 459 private Path getBaseTestDirOnTestFS() throws IOException { 460 FileSystem fs = getTestFileSystem(); 461 return new Path(fs.getWorkingDirectory(), "test-data"); 462 } 463 464 /** 465 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 466 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 467 * on it. 468 * @return a unique path in the test filesystem 469 */ 470 public Path getDataTestDirOnTestFS() throws IOException { 471 if (dataTestDirOnTestFS == null) { 472 setupDataTestDirOnTestFS(); 473 } 474 475 return dataTestDirOnTestFS; 476 } 477 478 /** 479 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 480 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 481 * on it. 482 * @return a unique path in the test filesystem 483 * @param subdirName name of the subdir to create under the base test dir 484 */ 485 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 486 return new Path(getDataTestDirOnTestFS(), subdirName); 487 } 488 489 /** 490 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 491 * setup. 492 */ 493 private void setupDataTestDirOnTestFS() throws IOException { 494 if (dataTestDirOnTestFS != null) { 495 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 496 return; 497 } 498 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 499 } 500 501 /** 502 * Sets up a new path in test filesystem to be used by tests. 503 */ 504 private Path getNewDataTestDirOnTestFS() throws IOException { 505 // The file system can be either local, mini dfs, or if the configuration 506 // is supplied externally, it can be an external cluster FS. If it is a local 507 // file system, the tests should use getBaseTestDir, otherwise, we can use 508 // the working directory, and create a unique sub dir there 509 FileSystem fs = getTestFileSystem(); 510 Path newDataTestDir; 511 String randomStr = getRandomUUID().toString(); 512 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 513 newDataTestDir = new Path(getDataTestDir(), randomStr); 514 File dataTestDir = new File(newDataTestDir.toString()); 515 if (deleteOnExit()) dataTestDir.deleteOnExit(); 516 } else { 517 Path base = getBaseTestDirOnTestFS(); 518 newDataTestDir = new Path(base, randomStr); 519 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 520 } 521 return newDataTestDir; 522 } 523 524 /** 525 * Cleans the test data directory on the test filesystem. 526 * @return True if we removed the test dirs 527 */ 528 public boolean cleanupDataTestDirOnTestFS() throws IOException { 529 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 530 if (ret) { 531 dataTestDirOnTestFS = null; 532 } 533 return ret; 534 } 535 536 /** 537 * Cleans a subdirectory under the test data directory on the test filesystem. 538 * @return True if we removed child 539 */ 540 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 541 Path cpath = getDataTestDirOnTestFS(subdirName); 542 return getTestFileSystem().delete(cpath, true); 543 } 544 545 /** 546 * Start a minidfscluster. 547 * @param servers How many DNs to start. 548 * @see #shutdownMiniDFSCluster() 549 * @return The mini dfs cluster created. 550 */ 551 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 552 return startMiniDFSCluster(servers, null); 553 } 554 555 /** 556 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 557 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 558 * instances of the datanodes will have the same host name. 559 * @param hosts hostnames DNs to run on. 560 * @see #shutdownMiniDFSCluster() 561 * @return The mini dfs cluster created. 562 */ 563 public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception { 564 if (hosts != null && hosts.length != 0) { 565 return startMiniDFSCluster(hosts.length, hosts); 566 } else { 567 return startMiniDFSCluster(1, null); 568 } 569 } 570 571 /** 572 * Start a minidfscluster. Can only create one. 573 * @param servers How many DNs to start. 574 * @param hosts hostnames DNs to run on. 575 * @see #shutdownMiniDFSCluster() 576 * @return The mini dfs cluster created. 577 */ 578 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception { 579 return startMiniDFSCluster(servers, null, hosts); 580 } 581 582 private void setFs() throws IOException { 583 if (this.dfsCluster == null) { 584 LOG.info("Skipping setting fs because dfsCluster is null"); 585 return; 586 } 587 FileSystem fs = this.dfsCluster.getFileSystem(); 588 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 589 590 // re-enable this check with dfs 591 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 592 } 593 594 // Workaround to avoid IllegalThreadStateException 595 // See HBASE-27148 for more details 596 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 597 598 private volatile boolean stopped = false; 599 600 private final MiniDFSCluster cluster; 601 602 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 603 super("FsDatasetAsyncDiskServiceFixer"); 604 setDaemon(true); 605 this.cluster = cluster; 606 } 607 608 @Override 609 public void run() { 610 while (!stopped) { 611 try { 612 Thread.sleep(30000); 613 } catch (InterruptedException e) { 614 Thread.currentThread().interrupt(); 615 continue; 616 } 617 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 618 // timeout of the thread pool executor is 60 seconds by default. 619 try { 620 for (DataNode dn : cluster.getDataNodes()) { 621 FsDatasetSpi<?> dataset = dn.getFSDataset(); 622 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 623 service.setAccessible(true); 624 Object asyncDiskService = service.get(dataset); 625 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 626 group.setAccessible(true); 627 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 628 if (threadGroup.isDaemon()) { 629 threadGroup.setDaemon(false); 630 } 631 } 632 } catch (NoSuchFieldException e) { 633 LOG.debug("NoSuchFieldException: " + e.getMessage() 634 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 635 + "See HBASE-27595 for details."); 636 } catch (Exception e) { 637 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 638 } 639 } 640 } 641 642 void shutdown() { 643 stopped = true; 644 interrupt(); 645 } 646 } 647 648 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts) 649 throws Exception { 650 createDirsAndSetProperties(); 651 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 652 653 this.dfsCluster = 654 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 655 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 656 this.dfsClusterFixer.start(); 657 // Set this just-started cluster as our filesystem. 658 setFs(); 659 660 // Wait for the cluster to be totally up 661 this.dfsCluster.waitClusterUp(); 662 663 // reset the test directory for test file system 664 dataTestDirOnTestFS = null; 665 String dataTestDir = getDataTestDir().toString(); 666 conf.set(HConstants.HBASE_DIR, dataTestDir); 667 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 668 669 return this.dfsCluster; 670 } 671 672 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 673 createDirsAndSetProperties(); 674 dfsCluster = 675 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 676 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 677 this.dfsClusterFixer.start(); 678 return dfsCluster; 679 } 680 681 /** 682 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 683 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 684 * the conf. 685 * 686 * <pre> 687 * Configuration conf = TEST_UTIL.getConfiguration(); 688 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 689 * Map.Entry<String, String> e = i.next(); 690 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 691 * } 692 * </pre> 693 */ 694 private void createDirsAndSetProperties() throws IOException { 695 setupClusterTestDir(); 696 conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath()); 697 createDirAndSetProperty("test.cache.data"); 698 createDirAndSetProperty("hadoop.tmp.dir"); 699 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 700 createDirAndSetProperty("mapreduce.cluster.local.dir"); 701 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 702 enableShortCircuit(); 703 704 Path root = getDataTestDirOnTestFS("hadoop"); 705 conf.set(MapreduceTestingShim.getMROutputDirProp(), 706 new Path(root, "mapred-output-dir").toString()); 707 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 708 conf.set("mapreduce.jobtracker.staging.root.dir", 709 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 710 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 711 conf.set("yarn.app.mapreduce.am.staging-dir", 712 new Path(root, "mapreduce-am-staging-root-dir").toString()); 713 714 // Frustrate yarn's and hdfs's attempts at writing /tmp. 715 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 716 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 717 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 718 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 719 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 720 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 721 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 722 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 723 createDirAndSetProperty("dfs.journalnode.edits.dir"); 724 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 725 createDirAndSetProperty("nfs.dump.dir"); 726 createDirAndSetProperty("java.io.tmpdir"); 727 createDirAndSetProperty("dfs.journalnode.edits.dir"); 728 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 729 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 730 } 731 732 /** 733 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 734 * Default to false. 735 */ 736 public boolean isNewVersionBehaviorEnabled() { 737 final String propName = "hbase.tests.new.version.behavior"; 738 String v = System.getProperty(propName); 739 if (v != null) { 740 return Boolean.parseBoolean(v); 741 } 742 return false; 743 } 744 745 /** 746 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 747 * allows to specify this parameter on the command line. If not set, default is true. 748 */ 749 public boolean isReadShortCircuitOn() { 750 final String propName = "hbase.tests.use.shortcircuit.reads"; 751 String readOnProp = System.getProperty(propName); 752 if (readOnProp != null) { 753 return Boolean.parseBoolean(readOnProp); 754 } else { 755 return conf.getBoolean(propName, false); 756 } 757 } 758 759 /** 760 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 761 * including skipping the hdfs checksum checks. 762 */ 763 private void enableShortCircuit() { 764 if (isReadShortCircuitOn()) { 765 String curUser = System.getProperty("user.name"); 766 LOG.info("read short circuit is ON for user " + curUser); 767 // read short circuit, for hdfs 768 conf.set("dfs.block.local-path-access.user", curUser); 769 // read short circuit, for hbase 770 conf.setBoolean("dfs.client.read.shortcircuit", true); 771 // Skip checking checksum, for the hdfs client and the datanode 772 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 773 } else { 774 LOG.info("read short circuit is OFF"); 775 } 776 } 777 778 private String createDirAndSetProperty(final String property) { 779 return createDirAndSetProperty(property, property); 780 } 781 782 private String createDirAndSetProperty(final String relPath, String property) { 783 String path = getDataTestDir(relPath).toString(); 784 System.setProperty(property, path); 785 conf.set(property, path); 786 new File(path).mkdirs(); 787 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 788 return path; 789 } 790 791 /** 792 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 793 */ 794 public void shutdownMiniDFSCluster() throws IOException { 795 if (this.dfsCluster != null) { 796 // The below throws an exception per dn, AsynchronousCloseException. 797 this.dfsCluster.shutdown(); 798 dfsCluster = null; 799 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 800 // have a fixer 801 if (dfsClusterFixer != null) { 802 this.dfsClusterFixer.shutdown(); 803 dfsClusterFixer = null; 804 } 805 dataTestDirOnTestFS = null; 806 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 807 } 808 } 809 810 /** 811 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 812 * other options will use default values, defined in {@link StartTestingClusterOption.Builder}. 813 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 814 * @see #startMiniCluster(StartTestingClusterOption option) 815 * @see #shutdownMiniDFSCluster() 816 */ 817 public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception { 818 StartTestingClusterOption option = StartTestingClusterOption.builder() 819 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 820 return startMiniCluster(option); 821 } 822 823 /** 824 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 825 * value can be found in {@link StartTestingClusterOption.Builder}. 826 * @see #startMiniCluster(StartTestingClusterOption option) 827 * @see #shutdownMiniDFSCluster() 828 */ 829 public SingleProcessHBaseCluster startMiniCluster() throws Exception { 830 return startMiniCluster(StartTestingClusterOption.builder().build()); 831 } 832 833 /** 834 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 835 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 836 * under System property test.build.data, to be cleaned up on exit. 837 * @see #shutdownMiniDFSCluster() 838 */ 839 public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option) 840 throws Exception { 841 LOG.info("Starting up minicluster with option: {}", option); 842 843 // If we already put up a cluster, fail. 844 if (miniClusterRunning) { 845 throw new IllegalStateException("A mini-cluster is already running"); 846 } 847 miniClusterRunning = true; 848 849 setupClusterTestDir(); 850 851 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 852 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 853 if (dfsCluster == null) { 854 LOG.info("STARTING DFS"); 855 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 856 } else { 857 LOG.info("NOT STARTING DFS"); 858 } 859 860 // Start up a zk cluster. 861 if (getZkCluster() == null) { 862 startMiniZKCluster(option.getNumZkServers()); 863 } 864 865 // Start the MiniHBaseCluster 866 return startMiniHBaseCluster(option); 867 } 868 869 /** 870 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 871 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 872 * @return Reference to the hbase mini hbase cluster. 873 * @see #startMiniCluster(StartTestingClusterOption) 874 * @see #shutdownMiniHBaseCluster() 875 */ 876 public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option) 877 throws IOException, InterruptedException { 878 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 879 createRootDir(option.isCreateRootDir()); 880 if (option.isCreateWALDir()) { 881 createWALRootDir(); 882 } 883 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 884 // for tests that do not read hbase-defaults.xml 885 setHBaseFsTmpDir(); 886 887 // These settings will make the server waits until this exact number of 888 // regions servers are connected. 889 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 890 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 891 } 892 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 893 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 894 } 895 896 Configuration c = new Configuration(this.conf); 897 this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(), 898 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 899 option.getMasterClass(), option.getRsClass()); 900 // Populate the master address configuration from mini cluster configuration. 901 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 902 // Don't leave here till we've done a successful scan of the hbase:meta 903 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 904 ResultScanner s = t.getScanner(new Scan())) { 905 for (;;) { 906 if (s.next() == null) { 907 break; 908 } 909 } 910 } 911 912 getAdmin(); // create immediately the hbaseAdmin 913 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 914 915 return (SingleProcessHBaseCluster) hbaseCluster; 916 } 917 918 /** 919 * Starts up mini hbase cluster using default options. Default options can be found in 920 * {@link StartTestingClusterOption.Builder}. 921 * @see #startMiniHBaseCluster(StartTestingClusterOption) 922 * @see #shutdownMiniHBaseCluster() 923 */ 924 public SingleProcessHBaseCluster startMiniHBaseCluster() 925 throws IOException, InterruptedException { 926 return startMiniHBaseCluster(StartTestingClusterOption.builder().build()); 927 } 928 929 /** 930 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 931 * {@link #startMiniCluster()}. All other options will use default values, defined in 932 * {@link StartTestingClusterOption.Builder}. 933 * @param numMasters Master node number. 934 * @param numRegionServers Number of region servers. 935 * @return The mini HBase cluster created. 936 * @see #shutdownMiniHBaseCluster() 937 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 938 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 939 * @see #startMiniHBaseCluster(StartTestingClusterOption) 940 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 941 */ 942 @Deprecated 943 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 944 throws IOException, InterruptedException { 945 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 946 .numRegionServers(numRegionServers).build(); 947 return startMiniHBaseCluster(option); 948 } 949 950 /** 951 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 952 * {@link #startMiniCluster()}. All other options will use default values, defined in 953 * {@link StartTestingClusterOption.Builder}. 954 * @param numMasters Master node number. 955 * @param numRegionServers Number of region servers. 956 * @param rsPorts Ports that RegionServer should use. 957 * @return The mini HBase cluster created. 958 * @see #shutdownMiniHBaseCluster() 959 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 960 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 961 * @see #startMiniHBaseCluster(StartTestingClusterOption) 962 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 963 */ 964 @Deprecated 965 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 966 List<Integer> rsPorts) throws IOException, InterruptedException { 967 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 968 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 969 return startMiniHBaseCluster(option); 970 } 971 972 /** 973 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 974 * {@link #startMiniCluster()}. All other options will use default values, defined in 975 * {@link StartTestingClusterOption.Builder}. 976 * @param numMasters Master node number. 977 * @param numRegionServers Number of region servers. 978 * @param rsPorts Ports that RegionServer should use. 979 * @param masterClass The class to use as HMaster, or null for default. 980 * @param rsClass The class to use as HRegionServer, or null for default. 981 * @param createRootDir Whether to create a new root or data directory path. 982 * @param createWALDir Whether to create a new WAL directory. 983 * @return The mini HBase cluster created. 984 * @see #shutdownMiniHBaseCluster() 985 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 986 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 987 * @see #startMiniHBaseCluster(StartTestingClusterOption) 988 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 989 */ 990 @Deprecated 991 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 992 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 993 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass, 994 boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException { 995 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 996 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 997 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 998 return startMiniHBaseCluster(option); 999 } 1000 1001 /** 1002 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 1003 * want to keep dfs/zk up and just stop/start hbase. 1004 * @param servers number of region servers 1005 */ 1006 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 1007 this.restartHBaseCluster(servers, null); 1008 } 1009 1010 public void restartHBaseCluster(int servers, List<Integer> ports) 1011 throws IOException, InterruptedException { 1012 StartTestingClusterOption option = 1013 StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 1014 restartHBaseCluster(option); 1015 invalidateConnection(); 1016 } 1017 1018 public void restartHBaseCluster(StartTestingClusterOption option) 1019 throws IOException, InterruptedException { 1020 closeConnection(); 1021 this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(), 1022 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1023 option.getMasterClass(), option.getRsClass()); 1024 // Don't leave here till we've done a successful scan of the hbase:meta 1025 Connection conn = ConnectionFactory.createConnection(this.conf); 1026 Table t = conn.getTable(TableName.META_TABLE_NAME); 1027 ResultScanner s = t.getScanner(new Scan()); 1028 while (s.next() != null) { 1029 // do nothing 1030 } 1031 LOG.info("HBase has been restarted"); 1032 s.close(); 1033 t.close(); 1034 conn.close(); 1035 } 1036 1037 /** 1038 * Returns current mini hbase cluster. Only has something in it after a call to 1039 * {@link #startMiniCluster()}. 1040 * @see #startMiniCluster() 1041 */ 1042 public SingleProcessHBaseCluster getMiniHBaseCluster() { 1043 if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) { 1044 return (SingleProcessHBaseCluster) this.hbaseCluster; 1045 } 1046 throw new RuntimeException( 1047 hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName()); 1048 } 1049 1050 /** 1051 * Stops mini hbase, zk, and hdfs clusters. 1052 * @see #startMiniCluster(int) 1053 */ 1054 public void shutdownMiniCluster() throws IOException { 1055 LOG.info("Shutting down minicluster"); 1056 shutdownMiniHBaseCluster(); 1057 shutdownMiniDFSCluster(); 1058 shutdownMiniZKCluster(); 1059 1060 cleanupTestDir(); 1061 miniClusterRunning = false; 1062 LOG.info("Minicluster is down"); 1063 } 1064 1065 /** 1066 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1067 * @throws java.io.IOException in case command is unsuccessful 1068 */ 1069 public void shutdownMiniHBaseCluster() throws IOException { 1070 cleanup(); 1071 if (this.hbaseCluster != null) { 1072 this.hbaseCluster.shutdown(); 1073 // Wait till hbase is down before going on to shutdown zk. 1074 this.hbaseCluster.waitUntilShutDown(); 1075 this.hbaseCluster = null; 1076 } 1077 if (zooKeeperWatcher != null) { 1078 zooKeeperWatcher.close(); 1079 zooKeeperWatcher = null; 1080 } 1081 } 1082 1083 /** 1084 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1085 * @throws java.io.IOException throws in case command is unsuccessful 1086 */ 1087 public void killMiniHBaseCluster() throws IOException { 1088 cleanup(); 1089 if (this.hbaseCluster != null) { 1090 getMiniHBaseCluster().killAll(); 1091 this.hbaseCluster = null; 1092 } 1093 if (zooKeeperWatcher != null) { 1094 zooKeeperWatcher.close(); 1095 zooKeeperWatcher = null; 1096 } 1097 } 1098 1099 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1100 private void cleanup() throws IOException { 1101 closeConnection(); 1102 // unset the configuration for MIN and MAX RS to start 1103 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1104 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1105 } 1106 1107 /** 1108 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1109 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1110 * If false, previous path is used. Note: this does not cause the root dir to be created. 1111 * @return Fully qualified path for the default hbase root dir 1112 */ 1113 public Path getDefaultRootDirPath(boolean create) throws IOException { 1114 if (!create) { 1115 return getDataTestDirOnTestFS(); 1116 } else { 1117 return getNewDataTestDirOnTestFS(); 1118 } 1119 } 1120 1121 /** 1122 * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that 1123 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1124 * @return Fully qualified path for the default hbase root dir 1125 */ 1126 public Path getDefaultRootDirPath() throws IOException { 1127 return getDefaultRootDirPath(false); 1128 } 1129 1130 /** 1131 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1132 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1133 * startup. You'd only use this method if you were doing manual operation. 1134 * @param create This flag decides whether to get a new root or data directory path or not, if it 1135 * has been fetched already. Note : Directory will be made irrespective of whether 1136 * path has been fetched or not. If directory already exists, it will be overwritten 1137 * @return Fully qualified path to hbase root dir 1138 */ 1139 public Path createRootDir(boolean create) throws IOException { 1140 FileSystem fs = FileSystem.get(this.conf); 1141 Path hbaseRootdir = getDefaultRootDirPath(create); 1142 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1143 fs.mkdirs(hbaseRootdir); 1144 FSUtils.setVersion(fs, hbaseRootdir); 1145 return hbaseRootdir; 1146 } 1147 1148 /** 1149 * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code> 1150 * flag is false. 1151 * @return Fully qualified path to hbase root dir 1152 */ 1153 public Path createRootDir() throws IOException { 1154 return createRootDir(false); 1155 } 1156 1157 /** 1158 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1159 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1160 * this method if you were doing manual operation. 1161 * @return Fully qualified path to hbase root dir 1162 */ 1163 public Path createWALRootDir() throws IOException { 1164 FileSystem fs = FileSystem.get(this.conf); 1165 Path walDir = getNewDataTestDirOnTestFS(); 1166 CommonFSUtils.setWALRootDir(this.conf, walDir); 1167 fs.mkdirs(walDir); 1168 return walDir; 1169 } 1170 1171 private void setHBaseFsTmpDir() throws IOException { 1172 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1173 if (hbaseFsTmpDirInString == null) { 1174 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1175 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1176 } else { 1177 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1178 } 1179 } 1180 1181 /** 1182 * Flushes all caches in the mini hbase cluster 1183 */ 1184 public void flush() throws IOException { 1185 getMiniHBaseCluster().flushcache(); 1186 } 1187 1188 /** 1189 * Flushes all caches in the mini hbase cluster 1190 */ 1191 public void flush(TableName tableName) throws IOException { 1192 getMiniHBaseCluster().flushcache(tableName); 1193 } 1194 1195 /** 1196 * Compact all regions in the mini hbase cluster 1197 */ 1198 public void compact(boolean major) throws IOException { 1199 getMiniHBaseCluster().compact(major); 1200 } 1201 1202 /** 1203 * Compact all of a table's reagion in the mini hbase cluster 1204 */ 1205 public void compact(TableName tableName, boolean major) throws IOException { 1206 getMiniHBaseCluster().compact(tableName, major); 1207 } 1208 1209 /** 1210 * Create a table. 1211 * @return A Table instance for the created table. 1212 */ 1213 public Table createTable(TableName tableName, String family) throws IOException { 1214 return createTable(tableName, new String[] { family }); 1215 } 1216 1217 /** 1218 * Create a table. 1219 * @return A Table instance for the created table. 1220 */ 1221 public Table createTable(TableName tableName, String[] families) throws IOException { 1222 List<byte[]> fams = new ArrayList<>(families.length); 1223 for (String family : families) { 1224 fams.add(Bytes.toBytes(family)); 1225 } 1226 return createTable(tableName, fams.toArray(new byte[0][])); 1227 } 1228 1229 /** 1230 * Create a table. 1231 * @return A Table instance for the created table. 1232 */ 1233 public Table createTable(TableName tableName, byte[] family) throws IOException { 1234 return createTable(tableName, new byte[][] { family }); 1235 } 1236 1237 /** 1238 * Create a table with multiple regions. 1239 * @return A Table instance for the created table. 1240 */ 1241 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1242 throws IOException { 1243 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1244 byte[] startKey = Bytes.toBytes("aaaaa"); 1245 byte[] endKey = Bytes.toBytes("zzzzz"); 1246 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1247 1248 return createTable(tableName, new byte[][] { family }, splitKeys); 1249 } 1250 1251 /** 1252 * Create a table. 1253 * @return A Table instance for the created table. 1254 */ 1255 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1256 return createTable(tableName, families, (byte[][]) null); 1257 } 1258 1259 /** 1260 * Create a table with multiple regions. 1261 * @return A Table instance for the created table. 1262 */ 1263 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1264 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1265 } 1266 1267 /** 1268 * Create a table with multiple regions. 1269 * @param replicaCount replica count. 1270 * @return A Table instance for the created table. 1271 */ 1272 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1273 throws IOException { 1274 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1275 } 1276 1277 /** 1278 * Create a table. 1279 * @return A Table instance for the created table. 1280 */ 1281 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1282 throws IOException { 1283 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1284 } 1285 1286 /** 1287 * Create a table. 1288 * @param tableName the table name 1289 * @param families the families 1290 * @param splitKeys the splitkeys 1291 * @param replicaCount the region replica count 1292 * @return A Table instance for the created table. 1293 * @throws IOException throws IOException 1294 */ 1295 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1296 int replicaCount) throws IOException { 1297 return createTable(tableName, families, splitKeys, replicaCount, 1298 new Configuration(getConfiguration())); 1299 } 1300 1301 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1302 byte[] endKey, int numRegions) throws IOException { 1303 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1304 1305 getAdmin().createTable(desc, startKey, endKey, numRegions); 1306 // HBaseAdmin only waits for regions to appear in hbase:meta we 1307 // should wait until they are assigned 1308 waitUntilAllRegionsAssigned(tableName); 1309 return getConnection().getTable(tableName); 1310 } 1311 1312 /** 1313 * Create a table. 1314 * @param c Configuration to use 1315 * @return A Table instance for the created table. 1316 */ 1317 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1318 throws IOException { 1319 return createTable(htd, families, null, c); 1320 } 1321 1322 /** 1323 * Create a table. 1324 * @param htd table descriptor 1325 * @param families array of column families 1326 * @param splitKeys array of split keys 1327 * @param c Configuration to use 1328 * @return A Table instance for the created table. 1329 * @throws IOException if getAdmin or createTable fails 1330 */ 1331 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1332 Configuration c) throws IOException { 1333 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1334 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1335 // on is interfering. 1336 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1337 } 1338 1339 /** 1340 * Create a table. 1341 * @param htd table descriptor 1342 * @param families array of column families 1343 * @param splitKeys array of split keys 1344 * @param type Bloom type 1345 * @param blockSize block size 1346 * @param c Configuration to use 1347 * @return A Table instance for the created table. 1348 * @throws IOException if getAdmin or createTable fails 1349 */ 1350 1351 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1352 BloomType type, int blockSize, Configuration c) throws IOException { 1353 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1354 for (byte[] family : families) { 1355 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1356 .setBloomFilterType(type).setBlocksize(blockSize); 1357 if (isNewVersionBehaviorEnabled()) { 1358 cfdb.setNewVersionBehavior(true); 1359 } 1360 builder.setColumnFamily(cfdb.build()); 1361 } 1362 TableDescriptor td = builder.build(); 1363 if (splitKeys != null) { 1364 getAdmin().createTable(td, splitKeys); 1365 } else { 1366 getAdmin().createTable(td); 1367 } 1368 // HBaseAdmin only waits for regions to appear in hbase:meta 1369 // we should wait until they are assigned 1370 waitUntilAllRegionsAssigned(td.getTableName()); 1371 return getConnection().getTable(td.getTableName()); 1372 } 1373 1374 /** 1375 * Create a table. 1376 * @param htd table descriptor 1377 * @param splitRows array of split keys 1378 * @return A Table instance for the created table. 1379 */ 1380 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1381 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1382 if (isNewVersionBehaviorEnabled()) { 1383 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1384 builder.setColumnFamily( 1385 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1386 } 1387 } 1388 if (splitRows != null) { 1389 getAdmin().createTable(builder.build(), splitRows); 1390 } else { 1391 getAdmin().createTable(builder.build()); 1392 } 1393 // HBaseAdmin only waits for regions to appear in hbase:meta 1394 // we should wait until they are assigned 1395 waitUntilAllRegionsAssigned(htd.getTableName()); 1396 return getConnection().getTable(htd.getTableName()); 1397 } 1398 1399 /** 1400 * Create a table. 1401 * @param tableName the table name 1402 * @param families the families 1403 * @param splitKeys the split keys 1404 * @param replicaCount the replica count 1405 * @param c Configuration to use 1406 * @return A Table instance for the created table. 1407 */ 1408 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1409 int replicaCount, final Configuration c) throws IOException { 1410 TableDescriptor htd = 1411 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1412 return createTable(htd, families, splitKeys, c); 1413 } 1414 1415 /** 1416 * Create a table. 1417 * @return A Table instance for the created table. 1418 */ 1419 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1420 return createTable(tableName, new byte[][] { family }, numVersions); 1421 } 1422 1423 /** 1424 * Create a table. 1425 * @return A Table instance for the created table. 1426 */ 1427 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1428 throws IOException { 1429 return createTable(tableName, families, numVersions, (byte[][]) null); 1430 } 1431 1432 /** 1433 * Create a table. 1434 * @return A Table instance for the created table. 1435 */ 1436 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1437 byte[][] splitKeys) throws IOException { 1438 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1439 for (byte[] family : families) { 1440 ColumnFamilyDescriptorBuilder cfBuilder = 1441 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1442 if (isNewVersionBehaviorEnabled()) { 1443 cfBuilder.setNewVersionBehavior(true); 1444 } 1445 builder.setColumnFamily(cfBuilder.build()); 1446 } 1447 if (splitKeys != null) { 1448 getAdmin().createTable(builder.build(), splitKeys); 1449 } else { 1450 getAdmin().createTable(builder.build()); 1451 } 1452 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1453 // assigned 1454 waitUntilAllRegionsAssigned(tableName); 1455 return getConnection().getTable(tableName); 1456 } 1457 1458 /** 1459 * Create a table with multiple regions. 1460 * @return A Table instance for the created table. 1461 */ 1462 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1463 throws IOException { 1464 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1465 } 1466 1467 /** 1468 * Create a table. 1469 * @return A Table instance for the created table. 1470 */ 1471 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1472 throws IOException { 1473 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1474 for (byte[] family : families) { 1475 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1476 .setMaxVersions(numVersions).setBlocksize(blockSize); 1477 if (isNewVersionBehaviorEnabled()) { 1478 cfBuilder.setNewVersionBehavior(true); 1479 } 1480 builder.setColumnFamily(cfBuilder.build()); 1481 } 1482 getAdmin().createTable(builder.build()); 1483 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1484 // assigned 1485 waitUntilAllRegionsAssigned(tableName); 1486 return getConnection().getTable(tableName); 1487 } 1488 1489 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1490 String cpName) throws IOException { 1491 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1492 for (byte[] family : families) { 1493 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1494 .setMaxVersions(numVersions).setBlocksize(blockSize); 1495 if (isNewVersionBehaviorEnabled()) { 1496 cfBuilder.setNewVersionBehavior(true); 1497 } 1498 builder.setColumnFamily(cfBuilder.build()); 1499 } 1500 if (cpName != null) { 1501 builder.setCoprocessor(cpName); 1502 } 1503 getAdmin().createTable(builder.build()); 1504 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1505 // assigned 1506 waitUntilAllRegionsAssigned(tableName); 1507 return getConnection().getTable(tableName); 1508 } 1509 1510 /** 1511 * Create a table. 1512 * @return A Table instance for the created table. 1513 */ 1514 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1515 throws IOException { 1516 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1517 int i = 0; 1518 for (byte[] family : families) { 1519 ColumnFamilyDescriptorBuilder cfBuilder = 1520 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1521 if (isNewVersionBehaviorEnabled()) { 1522 cfBuilder.setNewVersionBehavior(true); 1523 } 1524 builder.setColumnFamily(cfBuilder.build()); 1525 i++; 1526 } 1527 getAdmin().createTable(builder.build()); 1528 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1529 // assigned 1530 waitUntilAllRegionsAssigned(tableName); 1531 return getConnection().getTable(tableName); 1532 } 1533 1534 /** 1535 * Create a table. 1536 * @return A Table instance for the created table. 1537 */ 1538 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1539 throws IOException { 1540 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1541 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1542 if (isNewVersionBehaviorEnabled()) { 1543 cfBuilder.setNewVersionBehavior(true); 1544 } 1545 builder.setColumnFamily(cfBuilder.build()); 1546 getAdmin().createTable(builder.build(), splitRows); 1547 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1548 // assigned 1549 waitUntilAllRegionsAssigned(tableName); 1550 return getConnection().getTable(tableName); 1551 } 1552 1553 /** 1554 * Create a table with multiple regions. 1555 * @return A Table instance for the created table. 1556 */ 1557 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1558 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1559 } 1560 1561 /** 1562 * Set the number of Region replicas. 1563 */ 1564 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1565 throws IOException, InterruptedException { 1566 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1567 .setRegionReplication(replicaCount).build(); 1568 admin.modifyTable(desc); 1569 } 1570 1571 /** 1572 * Set the number of Region replicas. 1573 */ 1574 public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount) 1575 throws ExecutionException, IOException, InterruptedException { 1576 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get()) 1577 .setRegionReplication(replicaCount).build(); 1578 admin.modifyTable(desc).get(); 1579 } 1580 1581 /** 1582 * Drop an existing table 1583 * @param tableName existing table 1584 */ 1585 public void deleteTable(TableName tableName) throws IOException { 1586 try { 1587 getAdmin().disableTable(tableName); 1588 } catch (TableNotEnabledException e) { 1589 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1590 } 1591 getAdmin().deleteTable(tableName); 1592 } 1593 1594 /** 1595 * Drop an existing table 1596 * @param tableName existing table 1597 */ 1598 public void deleteTableIfAny(TableName tableName) throws IOException { 1599 try { 1600 deleteTable(tableName); 1601 } catch (TableNotFoundException e) { 1602 // ignore 1603 } 1604 } 1605 1606 // ========================================================================== 1607 // Canned table and table descriptor creation 1608 1609 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1610 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1611 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1612 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1613 private static final int MAXVERSIONS = 3; 1614 1615 public static final char FIRST_CHAR = 'a'; 1616 public static final char LAST_CHAR = 'z'; 1617 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1618 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1619 1620 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1621 return createModifyableTableDescriptor(TableName.valueOf(name), 1622 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1623 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1624 } 1625 1626 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1627 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1628 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1629 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1630 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1631 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1632 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1633 if (isNewVersionBehaviorEnabled()) { 1634 cfBuilder.setNewVersionBehavior(true); 1635 } 1636 builder.setColumnFamily(cfBuilder.build()); 1637 } 1638 return builder.build(); 1639 } 1640 1641 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1642 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1643 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1644 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1645 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1646 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1647 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1648 if (isNewVersionBehaviorEnabled()) { 1649 cfBuilder.setNewVersionBehavior(true); 1650 } 1651 builder.setColumnFamily(cfBuilder.build()); 1652 } 1653 return builder; 1654 } 1655 1656 /** 1657 * Create a table of name <code>name</code>. 1658 * @param name Name to give table. 1659 * @return Column descriptor. 1660 */ 1661 public TableDescriptor createTableDescriptor(final TableName name) { 1662 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1663 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1664 } 1665 1666 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1667 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1668 } 1669 1670 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1671 int maxVersions) { 1672 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1673 for (byte[] family : families) { 1674 ColumnFamilyDescriptorBuilder cfBuilder = 1675 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1676 if (isNewVersionBehaviorEnabled()) { 1677 cfBuilder.setNewVersionBehavior(true); 1678 } 1679 builder.setColumnFamily(cfBuilder.build()); 1680 } 1681 return builder.build(); 1682 } 1683 1684 /** 1685 * Create an HRegion that writes to the local tmp dirs 1686 * @param desc a table descriptor indicating which table the region belongs to 1687 * @param startKey the start boundary of the region 1688 * @param endKey the end boundary of the region 1689 * @return a region that writes to local dir for testing 1690 */ 1691 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1692 throws IOException { 1693 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1694 .setEndKey(endKey).build(); 1695 return createLocalHRegion(hri, desc); 1696 } 1697 1698 /** 1699 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1700 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it. 1701 */ 1702 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1703 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1704 } 1705 1706 /** 1707 * Create an HRegion that writes to the local tmp dirs with specified wal 1708 * @param info regioninfo 1709 * @param conf configuration 1710 * @param desc table descriptor 1711 * @param wal wal for this region. 1712 * @return created hregion 1713 */ 1714 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1715 WAL wal) throws IOException { 1716 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 1717 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1718 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1719 } 1720 1721 /** 1722 * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} 1723 * when done. 1724 */ 1725 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1726 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1727 throws IOException { 1728 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1729 durability, wal, null, families); 1730 } 1731 1732 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1733 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1734 boolean[] compactedMemStore, byte[]... families) throws IOException { 1735 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1736 builder.setReadOnly(isReadOnly); 1737 int i = 0; 1738 for (byte[] family : families) { 1739 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1740 if (compactedMemStore != null && i < compactedMemStore.length) { 1741 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1742 } else { 1743 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1744 1745 } 1746 i++; 1747 // Set default to be three versions. 1748 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1749 builder.setColumnFamily(cfBuilder.build()); 1750 } 1751 builder.setDurability(durability); 1752 RegionInfo info = 1753 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1754 return createLocalHRegion(info, conf, builder.build(), wal); 1755 } 1756 1757 // 1758 // ========================================================================== 1759 1760 /** 1761 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1762 * read. 1763 * @param tableName existing table 1764 * @return HTable to that new table 1765 */ 1766 public Table deleteTableData(TableName tableName) throws IOException { 1767 Table table = getConnection().getTable(tableName); 1768 Scan scan = new Scan(); 1769 ResultScanner resScan = table.getScanner(scan); 1770 for (Result res : resScan) { 1771 Delete del = new Delete(res.getRow()); 1772 table.delete(del); 1773 } 1774 resScan = table.getScanner(scan); 1775 resScan.close(); 1776 return table; 1777 } 1778 1779 /** 1780 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1781 * table. 1782 * @param tableName table which must exist. 1783 * @param preserveRegions keep the existing split points 1784 * @return HTable for the new table 1785 */ 1786 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 1787 throws IOException { 1788 Admin admin = getAdmin(); 1789 if (!admin.isTableDisabled(tableName)) { 1790 admin.disableTable(tableName); 1791 } 1792 admin.truncateTable(tableName, preserveRegions); 1793 return getConnection().getTable(tableName); 1794 } 1795 1796 /** 1797 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1798 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 1799 * preserve regions of existing table. 1800 * @param tableName table which must exist. 1801 * @return HTable for the new table 1802 */ 1803 public Table truncateTable(final TableName tableName) throws IOException { 1804 return truncateTable(tableName, false); 1805 } 1806 1807 /** 1808 * Load table with rows from 'aaa' to 'zzz'. 1809 * @param t Table 1810 * @param f Family 1811 * @return Count of rows loaded. 1812 */ 1813 public int loadTable(final Table t, final byte[] f) throws IOException { 1814 return loadTable(t, new byte[][] { f }); 1815 } 1816 1817 /** 1818 * Load table with rows from 'aaa' to 'zzz'. 1819 * @param t Table 1820 * @param f Family 1821 * @return Count of rows loaded. 1822 */ 1823 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 1824 return loadTable(t, new byte[][] { f }, null, writeToWAL); 1825 } 1826 1827 /** 1828 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1829 * @param t Table 1830 * @param f Array of Families to load 1831 * @return Count of rows loaded. 1832 */ 1833 public int loadTable(final Table t, final byte[][] f) throws IOException { 1834 return loadTable(t, f, null); 1835 } 1836 1837 /** 1838 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1839 * @param t Table 1840 * @param f Array of Families to load 1841 * @param value the values of the cells. If null is passed, the row key is used as value 1842 * @return Count of rows loaded. 1843 */ 1844 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 1845 return loadTable(t, f, value, true); 1846 } 1847 1848 /** 1849 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1850 * @param t Table 1851 * @param f Array of Families to load 1852 * @param value the values of the cells. If null is passed, the row key is used as value 1853 * @return Count of rows loaded. 1854 */ 1855 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 1856 throws IOException { 1857 List<Put> puts = new ArrayList<>(); 1858 for (byte[] row : HBaseTestingUtil.ROWS) { 1859 Put put = new Put(row); 1860 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 1861 for (int i = 0; i < f.length; i++) { 1862 byte[] value1 = value != null ? value : row; 1863 put.addColumn(f[i], f[i], value1); 1864 } 1865 puts.add(put); 1866 } 1867 t.put(puts); 1868 return puts.size(); 1869 } 1870 1871 /** 1872 * A tracker for tracking and validating table rows generated with 1873 * {@link HBaseTestingUtil#loadTable(Table, byte[])} 1874 */ 1875 public static class SeenRowTracker { 1876 int dim = 'z' - 'a' + 1; 1877 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 1878 byte[] startRow; 1879 byte[] stopRow; 1880 1881 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 1882 this.startRow = startRow; 1883 this.stopRow = stopRow; 1884 } 1885 1886 void reset() { 1887 for (byte[] row : ROWS) { 1888 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 1889 } 1890 } 1891 1892 int i(byte b) { 1893 return b - 'a'; 1894 } 1895 1896 public void addRow(byte[] row) { 1897 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 1898 } 1899 1900 /** 1901 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 1902 * rows none 1903 */ 1904 public void validate() { 1905 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1906 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1907 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1908 int count = seenRows[i(b1)][i(b2)][i(b3)]; 1909 int expectedCount = 0; 1910 if ( 1911 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 1912 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 1913 ) { 1914 expectedCount = 1; 1915 } 1916 if (count != expectedCount) { 1917 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 1918 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 1919 + "instead of " + expectedCount); 1920 } 1921 } 1922 } 1923 } 1924 } 1925 } 1926 1927 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 1928 return loadRegion(r, f, false); 1929 } 1930 1931 public int loadRegion(final Region r, final byte[] f) throws IOException { 1932 return loadRegion((HRegion) r, f); 1933 } 1934 1935 /** 1936 * Load region with rows from 'aaa' to 'zzz'. 1937 * @param r Region 1938 * @param f Family 1939 * @param flush flush the cache if true 1940 * @return Count of rows loaded. 1941 */ 1942 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 1943 byte[] k = new byte[3]; 1944 int rowCount = 0; 1945 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1946 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1947 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1948 k[0] = b1; 1949 k[1] = b2; 1950 k[2] = b3; 1951 Put put = new Put(k); 1952 put.setDurability(Durability.SKIP_WAL); 1953 put.addColumn(f, null, k); 1954 if (r.getWAL() == null) { 1955 put.setDurability(Durability.SKIP_WAL); 1956 } 1957 int preRowCount = rowCount; 1958 int pause = 10; 1959 int maxPause = 1000; 1960 while (rowCount == preRowCount) { 1961 try { 1962 r.put(put); 1963 rowCount++; 1964 } catch (RegionTooBusyException e) { 1965 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 1966 Threads.sleep(pause); 1967 } 1968 } 1969 } 1970 } 1971 if (flush) { 1972 r.flush(true); 1973 } 1974 } 1975 return rowCount; 1976 } 1977 1978 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 1979 throws IOException { 1980 for (int i = startRow; i < endRow; i++) { 1981 byte[] data = Bytes.toBytes(String.valueOf(i)); 1982 Put put = new Put(data); 1983 put.addColumn(f, null, data); 1984 t.put(put); 1985 } 1986 } 1987 1988 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 1989 throws IOException { 1990 for (int i = 0; i < totalRows; i++) { 1991 byte[] row = new byte[rowSize]; 1992 Bytes.random(row); 1993 Put put = new Put(row); 1994 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 1995 t.put(put); 1996 } 1997 } 1998 1999 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 2000 int replicaId) throws IOException { 2001 for (int i = startRow; i < endRow; i++) { 2002 String failMsg = "Failed verification of row :" + i; 2003 byte[] data = Bytes.toBytes(String.valueOf(i)); 2004 Get get = new Get(data); 2005 get.setReplicaId(replicaId); 2006 get.setConsistency(Consistency.TIMELINE); 2007 Result result = table.get(get); 2008 assertTrue(failMsg, result.containsColumn(f, null)); 2009 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2010 Cell cell = result.getColumnLatestCell(f, null); 2011 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2012 cell.getValueOffset(), cell.getValueLength())); 2013 } 2014 } 2015 2016 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 2017 throws IOException { 2018 verifyNumericRows((HRegion) region, f, startRow, endRow); 2019 } 2020 2021 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 2022 throws IOException { 2023 verifyNumericRows(region, f, startRow, endRow, true); 2024 } 2025 2026 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 2027 final boolean present) throws IOException { 2028 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 2029 } 2030 2031 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 2032 final boolean present) throws IOException { 2033 for (int i = startRow; i < endRow; i++) { 2034 String failMsg = "Failed verification of row :" + i; 2035 byte[] data = Bytes.toBytes(String.valueOf(i)); 2036 Result result = region.get(new Get(data)); 2037 2038 boolean hasResult = result != null && !result.isEmpty(); 2039 assertEquals(failMsg + result, present, hasResult); 2040 if (!present) continue; 2041 2042 assertTrue(failMsg, result.containsColumn(f, null)); 2043 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2044 Cell cell = result.getColumnLatestCell(f, null); 2045 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2046 cell.getValueOffset(), cell.getValueLength())); 2047 } 2048 } 2049 2050 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2051 throws IOException { 2052 for (int i = startRow; i < endRow; i++) { 2053 byte[] data = Bytes.toBytes(String.valueOf(i)); 2054 Delete delete = new Delete(data); 2055 delete.addFamily(f); 2056 t.delete(delete); 2057 } 2058 } 2059 2060 /** 2061 * Return the number of rows in the given table. 2062 * @param table to count rows 2063 * @return count of rows 2064 */ 2065 public static int countRows(final Table table) throws IOException { 2066 return countRows(table, new Scan()); 2067 } 2068 2069 public static int countRows(final Table table, final Scan scan) throws IOException { 2070 try (ResultScanner results = table.getScanner(scan)) { 2071 int count = 0; 2072 while (results.next() != null) { 2073 count++; 2074 } 2075 return count; 2076 } 2077 } 2078 2079 public static int countRows(final Table table, final byte[]... families) throws IOException { 2080 Scan scan = new Scan(); 2081 for (byte[] family : families) { 2082 scan.addFamily(family); 2083 } 2084 return countRows(table, scan); 2085 } 2086 2087 /** 2088 * Return the number of rows in the given table. 2089 */ 2090 public int countRows(final TableName tableName) throws IOException { 2091 try (Table table = getConnection().getTable(tableName)) { 2092 return countRows(table); 2093 } 2094 } 2095 2096 public static int countRows(final Region region) throws IOException { 2097 return countRows(region, new Scan()); 2098 } 2099 2100 public static int countRows(final Region region, final Scan scan) throws IOException { 2101 try (InternalScanner scanner = region.getScanner(scan)) { 2102 return countRows(scanner); 2103 } 2104 } 2105 2106 public static int countRows(final InternalScanner scanner) throws IOException { 2107 int scannedCount = 0; 2108 List<Cell> results = new ArrayList<>(); 2109 boolean hasMore = true; 2110 while (hasMore) { 2111 hasMore = scanner.next(results); 2112 scannedCount += results.size(); 2113 results.clear(); 2114 } 2115 return scannedCount; 2116 } 2117 2118 /** 2119 * Return an md5 digest of the entire contents of a table. 2120 */ 2121 public String checksumRows(final Table table) throws Exception { 2122 MessageDigest digest = MessageDigest.getInstance("MD5"); 2123 try (ResultScanner results = table.getScanner(new Scan())) { 2124 for (Result res : results) { 2125 digest.update(res.getRow()); 2126 } 2127 } 2128 return digest.toString(); 2129 } 2130 2131 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2132 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2133 static { 2134 int i = 0; 2135 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2136 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2137 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2138 ROWS[i][0] = b1; 2139 ROWS[i][1] = b2; 2140 ROWS[i][2] = b3; 2141 i++; 2142 } 2143 } 2144 } 2145 } 2146 2147 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2148 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2149 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2150 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2151 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2152 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2153 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2154 2155 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2156 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2157 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2158 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2159 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2160 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2161 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2162 2163 /** 2164 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2165 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2166 * @return list of region info for regions added to meta 2167 */ 2168 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2169 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2170 try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 2171 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2172 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2173 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2174 TableState.State.ENABLED); 2175 // add custom ones 2176 for (int i = 0; i < startKeys.length; i++) { 2177 int j = (i + 1) % startKeys.length; 2178 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2179 .setEndKey(startKeys[j]).build(); 2180 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2181 newRegions.add(hri); 2182 } 2183 return newRegions; 2184 } 2185 } 2186 2187 /** 2188 * Create an unmanaged WAL. Be sure to close it when you're through. 2189 */ 2190 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2191 throws IOException { 2192 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2193 // unless I pass along via the conf. 2194 Configuration confForWAL = new Configuration(conf); 2195 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 2196 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri); 2197 } 2198 2199 /** 2200 * Create a region with it's own WAL. Be sure to call 2201 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2202 */ 2203 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2204 final Configuration conf, final TableDescriptor htd) throws IOException { 2205 return createRegionAndWAL(info, rootDir, conf, htd, true); 2206 } 2207 2208 /** 2209 * Create a region with it's own WAL. Be sure to call 2210 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2211 */ 2212 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2213 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2214 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2215 region.setBlockCache(blockCache); 2216 region.initialize(); 2217 return region; 2218 } 2219 2220 /** 2221 * Create a region with it's own WAL. Be sure to call 2222 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2223 */ 2224 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2225 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2226 throws IOException { 2227 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2228 region.setMobFileCache(mobFileCache); 2229 region.initialize(); 2230 return region; 2231 } 2232 2233 /** 2234 * Create a region with it's own WAL. Be sure to call 2235 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2236 */ 2237 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2238 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2239 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2240 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2241 WAL wal = createWal(conf, rootDir, info); 2242 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2243 } 2244 2245 /** 2246 * Find any other region server which is different from the one identified by parameter 2247 * @return another region server 2248 */ 2249 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2250 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2251 if (!(rst.getRegionServer() == rs)) { 2252 return rst.getRegionServer(); 2253 } 2254 } 2255 return null; 2256 } 2257 2258 /** 2259 * Tool to get the reference to the region server object that holds the region of the specified 2260 * user table. 2261 * @param tableName user table to lookup in hbase:meta 2262 * @return region server that holds it, null if the row doesn't exist 2263 */ 2264 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2265 throws IOException, InterruptedException { 2266 List<RegionInfo> regions = getAdmin().getRegions(tableName); 2267 if (regions == null || regions.isEmpty()) { 2268 return null; 2269 } 2270 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2271 2272 byte[] firstRegionName = 2273 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2274 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2275 2276 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2277 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2278 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2279 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2280 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2281 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2282 while (retrier.shouldRetry()) { 2283 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2284 if (index != -1) { 2285 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2286 } 2287 // Came back -1. Region may not be online yet. Sleep a while. 2288 retrier.sleepUntilNextRetry(); 2289 } 2290 return null; 2291 } 2292 2293 /** 2294 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2295 * MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start multiple 2296 * MiniMRCluster instances with different log dirs. 2297 * @throws IOException When starting the cluster fails. 2298 */ 2299 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2300 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2301 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2302 "99.0"); 2303 startMiniMapReduceCluster(2); 2304 return mrCluster; 2305 } 2306 2307 /** 2308 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2309 * filesystem. MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start 2310 * multiple MiniMRCluster instances with different log dirs. 2311 * @param servers The number of <code>TaskTracker</code>'s to start. 2312 * @throws IOException When starting the cluster fails. 2313 */ 2314 private void startMiniMapReduceCluster(final int servers) throws IOException { 2315 if (mrCluster != null) { 2316 throw new IllegalStateException("MiniMRCluster is already running"); 2317 } 2318 LOG.info("Starting mini mapreduce cluster..."); 2319 setupClusterTestDir(); 2320 createDirsAndSetProperties(); 2321 2322 //// hadoop2 specific settings 2323 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2324 // we up the VM usable so that processes don't get killed. 2325 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2326 2327 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2328 // this avoids the problem by disabling speculative task execution in tests. 2329 conf.setBoolean("mapreduce.map.speculative", false); 2330 conf.setBoolean("mapreduce.reduce.speculative", false); 2331 //// 2332 2333 // Yarn container runs in independent JVM. We need to pass the argument manually here if the 2334 // JDK version >= 17. Otherwise, the MiniMRCluster will fail. 2335 if (JVM.getJVMSpecVersion() >= 17) { 2336 String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", ""); 2337 conf.set("yarn.app.mapreduce.am.command-opts", 2338 jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED"); 2339 } 2340 2341 // Allow the user to override FS URI for this map-reduce cluster to use. 2342 mrCluster = 2343 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2344 1, null, null, new JobConf(this.conf)); 2345 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2346 if (jobConf == null) { 2347 jobConf = mrCluster.createJobConf(); 2348 } 2349 2350 // Hadoop MiniMR overwrites this while it should not 2351 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2352 LOG.info("Mini mapreduce cluster started"); 2353 2354 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2355 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2356 // necessary config properties here. YARN-129 required adding a few properties. 2357 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2358 // this for mrv2 support; mr1 ignores this 2359 conf.set("mapreduce.framework.name", "yarn"); 2360 conf.setBoolean("yarn.is.minicluster", true); 2361 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2362 if (rmAddress != null) { 2363 conf.set("yarn.resourcemanager.address", rmAddress); 2364 } 2365 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2366 if (historyAddress != null) { 2367 conf.set("mapreduce.jobhistory.address", historyAddress); 2368 } 2369 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2370 if (schedulerAddress != null) { 2371 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2372 } 2373 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2374 if (mrJobHistoryWebappAddress != null) { 2375 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2376 } 2377 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2378 if (yarnRMWebappAddress != null) { 2379 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2380 } 2381 } 2382 2383 /** 2384 * Stops the previously started <code>MiniMRCluster</code>. 2385 */ 2386 public void shutdownMiniMapReduceCluster() { 2387 if (mrCluster != null) { 2388 LOG.info("Stopping mini mapreduce cluster..."); 2389 mrCluster.shutdown(); 2390 mrCluster = null; 2391 LOG.info("Mini mapreduce cluster stopped"); 2392 } 2393 // Restore configuration to point to local jobtracker 2394 conf.set("mapreduce.jobtracker.address", "local"); 2395 } 2396 2397 /** 2398 * Create a stubbed out RegionServerService, mainly for getting FS. 2399 */ 2400 public RegionServerServices createMockRegionServerService() throws IOException { 2401 return createMockRegionServerService((ServerName) null); 2402 } 2403 2404 /** 2405 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2406 * TestTokenAuthentication 2407 */ 2408 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2409 throws IOException { 2410 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2411 rss.setFileSystem(getTestFileSystem()); 2412 rss.setRpcServer(rpc); 2413 return rss; 2414 } 2415 2416 /** 2417 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2418 * TestOpenRegionHandler 2419 */ 2420 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2421 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2422 rss.setFileSystem(getTestFileSystem()); 2423 return rss; 2424 } 2425 2426 /** 2427 * Expire the Master's session 2428 */ 2429 public void expireMasterSession() throws Exception { 2430 HMaster master = getMiniHBaseCluster().getMaster(); 2431 expireSession(master.getZooKeeper(), false); 2432 } 2433 2434 /** 2435 * Expire a region server's session 2436 * @param index which RS 2437 */ 2438 public void expireRegionServerSession(int index) throws Exception { 2439 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2440 expireSession(rs.getZooKeeper(), false); 2441 decrementMinRegionServerCount(); 2442 } 2443 2444 private void decrementMinRegionServerCount() { 2445 // decrement the count for this.conf, for newly spwaned master 2446 // this.hbaseCluster shares this configuration too 2447 decrementMinRegionServerCount(getConfiguration()); 2448 2449 // each master thread keeps a copy of configuration 2450 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2451 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2452 } 2453 } 2454 2455 private void decrementMinRegionServerCount(Configuration conf) { 2456 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2457 if (currentCount != -1) { 2458 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2459 } 2460 } 2461 2462 public void expireSession(ZKWatcher nodeZK) throws Exception { 2463 expireSession(nodeZK, false); 2464 } 2465 2466 /** 2467 * Expire a ZooKeeper session as recommended in ZooKeeper documentation 2468 * http://hbase.apache.org/book.html#trouble.zookeeper 2469 * <p/> 2470 * There are issues when doing this: 2471 * <ol> 2472 * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li> 2473 * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li> 2474 * </ol> 2475 * @param nodeZK - the ZK watcher to expire 2476 * @param checkStatus - true to check if we can create a Table with the current configuration. 2477 */ 2478 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2479 Configuration c = new Configuration(this.conf); 2480 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2481 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2482 byte[] password = zk.getSessionPasswd(); 2483 long sessionID = zk.getSessionId(); 2484 2485 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2486 // so we create a first watcher to be sure that the 2487 // event was sent. We expect that if our watcher receives the event 2488 // other watchers on the same machine will get is as well. 2489 // When we ask to close the connection, ZK does not close it before 2490 // we receive all the events, so don't have to capture the event, just 2491 // closing the connection should be enough. 2492 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2493 @Override 2494 public void process(WatchedEvent watchedEvent) { 2495 LOG.info("Monitor ZKW received event=" + watchedEvent); 2496 } 2497 }, sessionID, password); 2498 2499 // Making it expire 2500 ZooKeeper newZK = 2501 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2502 2503 // ensure that we have connection to the server before closing down, otherwise 2504 // the close session event will be eaten out before we start CONNECTING state 2505 long start = EnvironmentEdgeManager.currentTime(); 2506 while ( 2507 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2508 ) { 2509 Thread.sleep(1); 2510 } 2511 newZK.close(); 2512 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2513 2514 // Now closing & waiting to be sure that the clients get it. 2515 monitor.close(); 2516 2517 if (checkStatus) { 2518 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2519 } 2520 } 2521 2522 /** 2523 * Get the Mini HBase cluster. 2524 * @return hbase cluster 2525 * @see #getHBaseClusterInterface() 2526 */ 2527 public SingleProcessHBaseCluster getHBaseCluster() { 2528 return getMiniHBaseCluster(); 2529 } 2530 2531 /** 2532 * Returns the HBaseCluster instance. 2533 * <p> 2534 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2535 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2536 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2537 * instead w/o the need to type-cast. 2538 */ 2539 public HBaseClusterInterface getHBaseClusterInterface() { 2540 // implementation note: we should rename this method as #getHBaseCluster(), 2541 // but this would require refactoring 90+ calls. 2542 return hbaseCluster; 2543 } 2544 2545 /** 2546 * Resets the connections so that the next time getConnection() is called, a new connection is 2547 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2548 * the connection is not valid anymore. 2549 * <p/> 2550 * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are 2551 * written, not all start() stop() calls go through this class. Most tests directly operate on the 2552 * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain 2553 * the connection state automatically. Cleaning this is a much bigger refactor. 2554 */ 2555 public void invalidateConnection() throws IOException { 2556 closeConnection(); 2557 // Update the master addresses if they changed. 2558 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2559 final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY); 2560 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2561 masterConfigBefore, masterConfAfter); 2562 conf.set(HConstants.MASTER_ADDRS_KEY, 2563 getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY)); 2564 } 2565 2566 /** 2567 * Get a shared Connection to the cluster. this method is thread safe. 2568 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2569 */ 2570 public Connection getConnection() throws IOException { 2571 return getAsyncConnection().toConnection(); 2572 } 2573 2574 /** 2575 * Get a assigned Connection to the cluster. this method is thread safe. 2576 * @param user assigned user 2577 * @return A Connection with assigned user. 2578 */ 2579 public Connection getConnection(User user) throws IOException { 2580 return getAsyncConnection(user).toConnection(); 2581 } 2582 2583 /** 2584 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2585 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2586 * of cluster. 2587 */ 2588 public AsyncClusterConnection getAsyncConnection() throws IOException { 2589 try { 2590 return asyncConnection.updateAndGet(connection -> { 2591 if (connection == null) { 2592 try { 2593 User user = UserProvider.instantiate(conf).getCurrent(); 2594 connection = getAsyncConnection(user); 2595 } catch (IOException ioe) { 2596 throw new UncheckedIOException("Failed to create connection", ioe); 2597 } 2598 } 2599 return connection; 2600 }); 2601 } catch (UncheckedIOException exception) { 2602 throw exception.getCause(); 2603 } 2604 } 2605 2606 /** 2607 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2608 * @param user assigned user 2609 * @return An AsyncClusterConnection with assigned user. 2610 */ 2611 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2612 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2613 } 2614 2615 public void closeConnection() throws IOException { 2616 if (hbaseAdmin != null) { 2617 Closeables.close(hbaseAdmin, true); 2618 hbaseAdmin = null; 2619 } 2620 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2621 if (asyncConnection != null) { 2622 Closeables.close(asyncConnection, true); 2623 } 2624 } 2625 2626 /** 2627 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2628 * it has no effect, it will be closed automatically when the cluster shutdowns 2629 */ 2630 public Admin getAdmin() throws IOException { 2631 if (hbaseAdmin == null) { 2632 this.hbaseAdmin = getConnection().getAdmin(); 2633 } 2634 return hbaseAdmin; 2635 } 2636 2637 private Admin hbaseAdmin = null; 2638 2639 /** 2640 * Returns an {@link Hbck} instance. Needs be closed when done. 2641 */ 2642 public Hbck getHbck() throws IOException { 2643 return getConnection().getHbck(); 2644 } 2645 2646 /** 2647 * Unassign the named region. 2648 * @param regionName The region to unassign. 2649 */ 2650 public void unassignRegion(String regionName) throws IOException { 2651 unassignRegion(Bytes.toBytes(regionName)); 2652 } 2653 2654 /** 2655 * Unassign the named region. 2656 * @param regionName The region to unassign. 2657 */ 2658 public void unassignRegion(byte[] regionName) throws IOException { 2659 getAdmin().unassign(regionName); 2660 } 2661 2662 /** 2663 * Closes the region containing the given row. 2664 * @param row The row to find the containing region. 2665 * @param table The table to find the region. 2666 */ 2667 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 2668 unassignRegionByRow(Bytes.toBytes(row), table); 2669 } 2670 2671 /** 2672 * Closes the region containing the given row. 2673 * @param row The row to find the containing region. 2674 * @param table The table to find the region. 2675 */ 2676 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 2677 HRegionLocation hrl = table.getRegionLocation(row); 2678 unassignRegion(hrl.getRegion().getRegionName()); 2679 } 2680 2681 /** 2682 * Retrieves a splittable region randomly from tableName 2683 * @param tableName name of table 2684 * @param maxAttempts maximum number of attempts, unlimited for value of -1 2685 * @return the HRegion chosen, null if none was found within limit of maxAttempts 2686 */ 2687 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 2688 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 2689 int regCount = regions.size(); 2690 Set<Integer> attempted = new HashSet<>(); 2691 int idx; 2692 int attempts = 0; 2693 do { 2694 regions = getHBaseCluster().getRegions(tableName); 2695 if (regCount != regions.size()) { 2696 // if there was region movement, clear attempted Set 2697 attempted.clear(); 2698 } 2699 regCount = regions.size(); 2700 // There are chances that before we get the region for the table from an RS the region may 2701 // be going for CLOSE. This may be because online schema change is enabled 2702 if (regCount > 0) { 2703 idx = ThreadLocalRandom.current().nextInt(regCount); 2704 // if we have just tried this region, there is no need to try again 2705 if (attempted.contains(idx)) { 2706 continue; 2707 } 2708 HRegion region = regions.get(idx); 2709 if (region.checkSplit().isPresent()) { 2710 return region; 2711 } 2712 attempted.add(idx); 2713 } 2714 attempts++; 2715 } while (maxAttempts == -1 || attempts < maxAttempts); 2716 return null; 2717 } 2718 2719 public MiniDFSCluster getDFSCluster() { 2720 return dfsCluster; 2721 } 2722 2723 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 2724 setDFSCluster(cluster, true); 2725 } 2726 2727 /** 2728 * Set the MiniDFSCluster 2729 * @param cluster cluster to use 2730 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 2731 * is set. 2732 * @throws IllegalStateException if the passed cluster is up when it is required to be down 2733 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 2734 */ 2735 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 2736 throws IllegalStateException, IOException { 2737 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 2738 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 2739 } 2740 this.dfsCluster = cluster; 2741 this.setFs(); 2742 } 2743 2744 public FileSystem getTestFileSystem() throws IOException { 2745 return HFileSystem.get(conf); 2746 } 2747 2748 /** 2749 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 2750 * (30 seconds). 2751 * @param table Table to wait on. 2752 */ 2753 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 2754 waitTableAvailable(table.getName(), 30000); 2755 } 2756 2757 public void waitTableAvailable(TableName table, long timeoutMillis) 2758 throws InterruptedException, IOException { 2759 waitFor(timeoutMillis, predicateTableAvailable(table)); 2760 } 2761 2762 /** 2763 * Wait until all regions in a table have been assigned 2764 * @param table Table to wait on. 2765 * @param timeoutMillis Timeout. 2766 */ 2767 public void waitTableAvailable(byte[] table, long timeoutMillis) 2768 throws InterruptedException, IOException { 2769 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 2770 } 2771 2772 public String explainTableAvailability(TableName tableName) throws IOException { 2773 StringBuilder msg = 2774 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 2775 if (getHBaseCluster().getMaster().isAlive()) { 2776 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 2777 .getRegionStates().getRegionAssignments(); 2778 final List<Pair<RegionInfo, ServerName>> metaLocations = 2779 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 2780 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 2781 RegionInfo hri = metaLocation.getFirst(); 2782 ServerName sn = metaLocation.getSecond(); 2783 if (!assignments.containsKey(hri)) { 2784 msg.append(", region ").append(hri) 2785 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 2786 } else if (sn == null) { 2787 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 2788 } else if (!sn.equals(assignments.get(hri))) { 2789 msg.append(", region ").append(hri) 2790 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 2791 .append(" <> ").append(assignments.get(hri)); 2792 } 2793 } 2794 } 2795 return msg.toString(); 2796 } 2797 2798 public String explainTableState(final TableName table, TableState.State state) 2799 throws IOException { 2800 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 2801 if (tableState == null) { 2802 return "TableState in META: No table state in META for table " + table 2803 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 2804 } else if (!tableState.inStates(state)) { 2805 return "TableState in META: Not " + state + " state, but " + tableState; 2806 } else { 2807 return "TableState in META: OK"; 2808 } 2809 } 2810 2811 @Nullable 2812 public TableState findLastTableState(final TableName table) throws IOException { 2813 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 2814 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 2815 @Override 2816 public boolean visit(Result r) throws IOException { 2817 if (!Arrays.equals(r.getRow(), table.getName())) { 2818 return false; 2819 } 2820 TableState state = CatalogFamilyFormat.getTableState(r); 2821 if (state != null) { 2822 lastTableState.set(state); 2823 } 2824 return true; 2825 } 2826 }; 2827 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 2828 Integer.MAX_VALUE, visitor); 2829 return lastTableState.get(); 2830 } 2831 2832 /** 2833 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2834 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 2835 * table. 2836 * @param table the table to wait on. 2837 * @throws InterruptedException if interrupted while waiting 2838 * @throws IOException if an IO problem is encountered 2839 */ 2840 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 2841 waitTableEnabled(table, 30000); 2842 } 2843 2844 /** 2845 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2846 * have been all assigned. 2847 * @see #waitTableEnabled(TableName, long) 2848 * @param table Table to wait on. 2849 * @param timeoutMillis Time to wait on it being marked enabled. 2850 */ 2851 public void waitTableEnabled(byte[] table, long timeoutMillis) 2852 throws InterruptedException, IOException { 2853 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 2854 } 2855 2856 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 2857 waitFor(timeoutMillis, predicateTableEnabled(table)); 2858 } 2859 2860 /** 2861 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 2862 * after default period (30 seconds) 2863 * @param table Table to wait on. 2864 */ 2865 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 2866 waitTableDisabled(table, 30000); 2867 } 2868 2869 public void waitTableDisabled(TableName table, long millisTimeout) 2870 throws InterruptedException, IOException { 2871 waitFor(millisTimeout, predicateTableDisabled(table)); 2872 } 2873 2874 /** 2875 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 2876 * @param table Table to wait on. 2877 * @param timeoutMillis Time to wait on it being marked disabled. 2878 */ 2879 public void waitTableDisabled(byte[] table, long timeoutMillis) 2880 throws InterruptedException, IOException { 2881 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 2882 } 2883 2884 /** 2885 * Make sure that at least the specified number of region servers are running 2886 * @param num minimum number of region servers that should be running 2887 * @return true if we started some servers 2888 */ 2889 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 2890 boolean startedServer = false; 2891 SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster(); 2892 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 2893 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 2894 startedServer = true; 2895 } 2896 2897 return startedServer; 2898 } 2899 2900 /** 2901 * Make sure that at least the specified number of region servers are running. We don't count the 2902 * ones that are currently stopping or are stopped. 2903 * @param num minimum number of region servers that should be running 2904 * @return true if we started some servers 2905 */ 2906 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 2907 boolean startedServer = ensureSomeRegionServersAvailable(num); 2908 2909 int nonStoppedServers = 0; 2910 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2911 2912 HRegionServer hrs = rst.getRegionServer(); 2913 if (hrs.isStopping() || hrs.isStopped()) { 2914 LOG.info("A region server is stopped or stopping:" + hrs); 2915 } else { 2916 nonStoppedServers++; 2917 } 2918 } 2919 for (int i = nonStoppedServers; i < num; ++i) { 2920 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 2921 startedServer = true; 2922 } 2923 return startedServer; 2924 } 2925 2926 /** 2927 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 2928 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 2929 * @param c Initial configuration 2930 * @param differentiatingSuffix Suffix to differentiate this user from others. 2931 * @return A new configuration instance with a different user set into it. 2932 */ 2933 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 2934 throws IOException { 2935 FileSystem currentfs = FileSystem.get(c); 2936 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 2937 return User.getCurrent(); 2938 } 2939 // Else distributed filesystem. Make a new instance per daemon. Below 2940 // code is taken from the AppendTestUtil over in hdfs. 2941 String username = User.getCurrent().getName() + differentiatingSuffix; 2942 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 2943 return user; 2944 } 2945 2946 public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster) 2947 throws IOException { 2948 NavigableSet<String> online = new TreeSet<>(); 2949 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 2950 try { 2951 for (RegionInfo region : ProtobufUtil 2952 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 2953 online.add(region.getRegionNameAsString()); 2954 } 2955 } catch (RegionServerStoppedException e) { 2956 // That's fine. 2957 } 2958 } 2959 return online; 2960 } 2961 2962 /** 2963 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 2964 * linger. Here is the exception you'll see: 2965 * 2966 * <pre> 2967 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 2968 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 2969 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 2970 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 2971 * </pre> 2972 * 2973 * @param stream A DFSClient.DFSOutputStream. 2974 */ 2975 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 2976 try { 2977 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 2978 for (Class<?> clazz : clazzes) { 2979 String className = clazz.getSimpleName(); 2980 if (className.equals("DFSOutputStream")) { 2981 if (clazz.isInstance(stream)) { 2982 Field maxRecoveryErrorCountField = 2983 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 2984 maxRecoveryErrorCountField.setAccessible(true); 2985 maxRecoveryErrorCountField.setInt(stream, max); 2986 break; 2987 } 2988 } 2989 } 2990 } catch (Exception e) { 2991 LOG.info("Could not set max recovery field", e); 2992 } 2993 } 2994 2995 /** 2996 * Uses directly the assignment manager to assign the region. and waits until the specified region 2997 * has completed assignment. 2998 * @return true if the region is assigned false otherwise. 2999 */ 3000 public boolean assignRegion(final RegionInfo regionInfo) 3001 throws IOException, InterruptedException { 3002 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 3003 am.assign(regionInfo); 3004 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 3005 } 3006 3007 /** 3008 * Move region to destination server and wait till region is completely moved and online 3009 * @param destRegion region to move 3010 * @param destServer destination server of the region 3011 */ 3012 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3013 throws InterruptedException, IOException { 3014 HMaster master = getMiniHBaseCluster().getMaster(); 3015 // TODO: Here we start the move. The move can take a while. 3016 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3017 while (true) { 3018 ServerName serverName = 3019 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3020 if (serverName != null && serverName.equals(destServer)) { 3021 assertRegionOnServer(destRegion, serverName, 2000); 3022 break; 3023 } 3024 Thread.sleep(10); 3025 } 3026 } 3027 3028 /** 3029 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3030 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3031 * master has been informed and updated hbase:meta with the regions deployed server. 3032 * @param tableName the table name 3033 */ 3034 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3035 waitUntilAllRegionsAssigned(tableName, 3036 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3037 } 3038 3039 /** 3040 * Waith until all system table's regions get assigned 3041 */ 3042 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3043 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3044 } 3045 3046 /** 3047 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3048 * timeout. This means all regions have been deployed, master has been informed and updated 3049 * hbase:meta with the regions deployed server. 3050 * @param tableName the table name 3051 * @param timeout timeout, in milliseconds 3052 */ 3053 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3054 throws IOException { 3055 if (!TableName.isMetaTableName(tableName)) { 3056 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3057 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3058 + timeout + "ms"); 3059 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3060 @Override 3061 public String explainFailure() throws IOException { 3062 return explainTableAvailability(tableName); 3063 } 3064 3065 @Override 3066 public boolean evaluate() throws IOException { 3067 Scan scan = new Scan(); 3068 scan.addFamily(HConstants.CATALOG_FAMILY); 3069 boolean tableFound = false; 3070 try (ResultScanner s = meta.getScanner(scan)) { 3071 for (Result r; (r = s.next()) != null;) { 3072 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3073 RegionInfo info = RegionInfo.parseFromOrNull(b); 3074 if (info != null && info.getTable().equals(tableName)) { 3075 // Get server hosting this region from catalog family. Return false if no server 3076 // hosting this region, or if the server hosting this region was recently killed 3077 // (for fault tolerance testing). 3078 tableFound = true; 3079 byte[] server = 3080 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3081 if (server == null) { 3082 return false; 3083 } else { 3084 byte[] startCode = 3085 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3086 ServerName serverName = 3087 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3088 + Bytes.toLong(startCode)); 3089 if ( 3090 !getHBaseClusterInterface().isDistributedCluster() 3091 && getHBaseCluster().isKilledRS(serverName) 3092 ) { 3093 return false; 3094 } 3095 } 3096 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3097 return false; 3098 } 3099 } 3100 } 3101 } 3102 if (!tableFound) { 3103 LOG.warn( 3104 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3105 } 3106 return tableFound; 3107 } 3108 }); 3109 } 3110 } 3111 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3112 // check from the master state if we are using a mini cluster 3113 if (!getHBaseClusterInterface().isDistributedCluster()) { 3114 // So, all regions are in the meta table but make sure master knows of the assignments before 3115 // returning -- sometimes this can lag. 3116 HMaster master = getHBaseCluster().getMaster(); 3117 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3118 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3119 @Override 3120 public String explainFailure() throws IOException { 3121 return explainTableAvailability(tableName); 3122 } 3123 3124 @Override 3125 public boolean evaluate() throws IOException { 3126 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3127 return hris != null && !hris.isEmpty(); 3128 } 3129 }); 3130 } 3131 LOG.info("All regions for table " + tableName + " assigned."); 3132 } 3133 3134 /** 3135 * Do a small get/scan against one store. This is required because store has no actual methods of 3136 * querying itself, and relies on StoreScanner. 3137 */ 3138 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3139 Scan scan = new Scan(get); 3140 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3141 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3142 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3143 // readpoint 0. 3144 0); 3145 3146 List<Cell> result = new ArrayList<>(); 3147 scanner.next(result); 3148 if (!result.isEmpty()) { 3149 // verify that we are on the row we want: 3150 Cell kv = result.get(0); 3151 if (!CellUtil.matchingRows(kv, get.getRow())) { 3152 result.clear(); 3153 } 3154 } 3155 scanner.close(); 3156 return result; 3157 } 3158 3159 /** 3160 * Create region split keys between startkey and endKey 3161 * @param numRegions the number of regions to be created. it has to be greater than 3. 3162 * @return resulting split keys 3163 */ 3164 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3165 assertTrue(numRegions > 3); 3166 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3167 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3168 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3169 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3170 return result; 3171 } 3172 3173 /** 3174 * Do a small get/scan against one store. This is required because store has no actual methods of 3175 * querying itself, and relies on StoreScanner. 3176 */ 3177 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3178 throws IOException { 3179 Get get = new Get(row); 3180 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3181 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3182 3183 return getFromStoreFile(store, get); 3184 } 3185 3186 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3187 final List<? extends Cell> actual) { 3188 final int eLen = expected.size(); 3189 final int aLen = actual.size(); 3190 final int minLen = Math.min(eLen, aLen); 3191 3192 int i = 0; 3193 while ( 3194 i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0 3195 ) { 3196 i++; 3197 } 3198 3199 if (additionalMsg == null) { 3200 additionalMsg = ""; 3201 } 3202 if (!additionalMsg.isEmpty()) { 3203 additionalMsg = ". " + additionalMsg; 3204 } 3205 3206 if (eLen != aLen || i != minLen) { 3207 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3208 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3209 + " (length " + aLen + ")" + additionalMsg); 3210 } 3211 } 3212 3213 public static <T> String safeGetAsStr(List<T> lst, int i) { 3214 if (0 <= i && i < lst.size()) { 3215 return lst.get(i).toString(); 3216 } else { 3217 return "<out_of_range>"; 3218 } 3219 } 3220 3221 public String getRpcConnnectionURI() throws UnknownHostException { 3222 return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); 3223 } 3224 3225 public String getZkConnectionURI() { 3226 return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 3227 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3228 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3229 } 3230 3231 /** 3232 * Get the zk based cluster key for this cluster. 3233 * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the 3234 * connection info of a cluster. Keep here only for compatibility. 3235 * @see #getRpcConnnectionURI() 3236 * @see #getZkConnectionURI() 3237 */ 3238 @Deprecated 3239 public String getClusterKey() { 3240 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3241 + ":" 3242 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3243 } 3244 3245 /** 3246 * Creates a random table with the given parameters 3247 */ 3248 public Table createRandomTable(TableName tableName, final Collection<String> families, 3249 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3250 final int numRowsPerFlush) throws IOException, InterruptedException { 3251 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3252 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3253 + maxVersions + "\n"); 3254 3255 final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L); 3256 final int numCF = families.size(); 3257 final byte[][] cfBytes = new byte[numCF][]; 3258 { 3259 int cfIndex = 0; 3260 for (String cf : families) { 3261 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3262 } 3263 } 3264 3265 final int actualStartKey = 0; 3266 final int actualEndKey = Integer.MAX_VALUE; 3267 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3268 final int splitStartKey = actualStartKey + keysPerRegion; 3269 final int splitEndKey = actualEndKey - keysPerRegion; 3270 final String keyFormat = "%08x"; 3271 final Table table = createTable(tableName, cfBytes, maxVersions, 3272 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3273 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3274 3275 if (hbaseCluster != null) { 3276 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3277 } 3278 3279 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3280 3281 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3282 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3283 final byte[] row = Bytes.toBytes( 3284 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3285 3286 Put put = new Put(row); 3287 Delete del = new Delete(row); 3288 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3289 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3290 final long ts = rand.nextInt(); 3291 final byte[] qual = Bytes.toBytes("col" + iCol); 3292 if (rand.nextBoolean()) { 3293 final byte[] value = 3294 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3295 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3296 put.addColumn(cf, qual, ts, value); 3297 } else if (rand.nextDouble() < 0.8) { 3298 del.addColumn(cf, qual, ts); 3299 } else { 3300 del.addColumns(cf, qual, ts); 3301 } 3302 } 3303 3304 if (!put.isEmpty()) { 3305 mutator.mutate(put); 3306 } 3307 3308 if (!del.isEmpty()) { 3309 mutator.mutate(del); 3310 } 3311 } 3312 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3313 mutator.flush(); 3314 if (hbaseCluster != null) { 3315 getMiniHBaseCluster().flushcache(table.getName()); 3316 } 3317 } 3318 mutator.close(); 3319 3320 return table; 3321 } 3322 3323 public static int randomFreePort() { 3324 return HBaseCommonTestingUtil.randomFreePort(); 3325 } 3326 3327 public static String randomMultiCastAddress() { 3328 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3329 } 3330 3331 public static void waitForHostPort(String host, int port) throws IOException { 3332 final int maxTimeMs = 10000; 3333 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3334 IOException savedException = null; 3335 LOG.info("Waiting for server at " + host + ":" + port); 3336 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3337 try { 3338 Socket sock = new Socket(InetAddress.getByName(host), port); 3339 sock.close(); 3340 savedException = null; 3341 LOG.info("Server at " + host + ":" + port + " is available"); 3342 break; 3343 } catch (UnknownHostException e) { 3344 throw new IOException("Failed to look up " + host, e); 3345 } catch (IOException e) { 3346 savedException = e; 3347 } 3348 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3349 } 3350 3351 if (savedException != null) { 3352 throw savedException; 3353 } 3354 } 3355 3356 public static int getMetaRSPort(Connection connection) throws IOException { 3357 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3358 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3359 } 3360 } 3361 3362 /** 3363 * Due to async racing issue, a region may not be in the online region list of a region server 3364 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3365 */ 3366 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3367 final long timeout) throws IOException, InterruptedException { 3368 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3369 while (true) { 3370 List<RegionInfo> regions = getAdmin().getRegions(server); 3371 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3372 long now = EnvironmentEdgeManager.currentTime(); 3373 if (now > timeoutTime) break; 3374 Thread.sleep(10); 3375 } 3376 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3377 } 3378 3379 /** 3380 * Check to make sure the region is open on the specified region server, but not on any other one. 3381 */ 3382 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3383 final long timeout) throws IOException, InterruptedException { 3384 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3385 while (true) { 3386 List<RegionInfo> regions = getAdmin().getRegions(server); 3387 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3388 List<JVMClusterUtil.RegionServerThread> rsThreads = 3389 getHBaseCluster().getLiveRegionServerThreads(); 3390 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3391 HRegionServer rs = rsThread.getRegionServer(); 3392 if (server.equals(rs.getServerName())) { 3393 continue; 3394 } 3395 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3396 for (HRegion r : hrs) { 3397 assertTrue("Region should not be double assigned", 3398 r.getRegionInfo().getRegionId() != hri.getRegionId()); 3399 } 3400 } 3401 return; // good, we are happy 3402 } 3403 long now = EnvironmentEdgeManager.currentTime(); 3404 if (now > timeoutTime) break; 3405 Thread.sleep(10); 3406 } 3407 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3408 } 3409 3410 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3411 TableDescriptor td = 3412 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3413 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3414 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3415 } 3416 3417 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3418 BlockCache blockCache) throws IOException { 3419 TableDescriptor td = 3420 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3421 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3422 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3423 } 3424 3425 public static void setFileSystemURI(String fsURI) { 3426 FS_URI = fsURI; 3427 } 3428 3429 /** 3430 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3431 */ 3432 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3433 return new ExplainingPredicate<IOException>() { 3434 @Override 3435 public String explainFailure() throws IOException { 3436 final RegionStates regionStates = 3437 getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 3438 return "found in transition: " + regionStates.getRegionsInTransition().toString(); 3439 } 3440 3441 @Override 3442 public boolean evaluate() throws IOException { 3443 HMaster master = getMiniHBaseCluster().getMaster(); 3444 if (master == null) return false; 3445 AssignmentManager am = master.getAssignmentManager(); 3446 if (am == null) return false; 3447 return !am.hasRegionsInTransition(); 3448 } 3449 }; 3450 } 3451 3452 /** 3453 * Returns a {@link Predicate} for checking that table is enabled 3454 */ 3455 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3456 return new ExplainingPredicate<IOException>() { 3457 @Override 3458 public String explainFailure() throws IOException { 3459 return explainTableState(tableName, TableState.State.ENABLED); 3460 } 3461 3462 @Override 3463 public boolean evaluate() throws IOException { 3464 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3465 } 3466 }; 3467 } 3468 3469 /** 3470 * Returns a {@link Predicate} for checking that table is enabled 3471 */ 3472 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3473 return new ExplainingPredicate<IOException>() { 3474 @Override 3475 public String explainFailure() throws IOException { 3476 return explainTableState(tableName, TableState.State.DISABLED); 3477 } 3478 3479 @Override 3480 public boolean evaluate() throws IOException { 3481 return getAdmin().isTableDisabled(tableName); 3482 } 3483 }; 3484 } 3485 3486 /** 3487 * Returns a {@link Predicate} for checking that table is enabled 3488 */ 3489 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3490 return new ExplainingPredicate<IOException>() { 3491 @Override 3492 public String explainFailure() throws IOException { 3493 return explainTableAvailability(tableName); 3494 } 3495 3496 @Override 3497 public boolean evaluate() throws IOException { 3498 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3499 if (tableAvailable) { 3500 try (Table table = getConnection().getTable(tableName)) { 3501 TableDescriptor htd = table.getDescriptor(); 3502 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3503 .getAllRegionLocations()) { 3504 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3505 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3506 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3507 for (byte[] family : htd.getColumnFamilyNames()) { 3508 scan.addFamily(family); 3509 } 3510 try (ResultScanner scanner = table.getScanner(scan)) { 3511 scanner.next(); 3512 } 3513 } 3514 } 3515 } 3516 return tableAvailable; 3517 } 3518 }; 3519 } 3520 3521 /** 3522 * Wait until no regions in transition. 3523 * @param timeout How long to wait. 3524 */ 3525 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3526 waitFor(timeout, predicateNoRegionsInTransition()); 3527 } 3528 3529 /** 3530 * Wait until no regions in transition. (time limit 15min) 3531 */ 3532 public void waitUntilNoRegionsInTransition() throws IOException { 3533 waitUntilNoRegionsInTransition(15 * 60000); 3534 } 3535 3536 /** 3537 * Wait until labels is ready in VisibilityLabelsCache. 3538 */ 3539 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 3540 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 3541 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 3542 3543 @Override 3544 public boolean evaluate() { 3545 for (String label : labels) { 3546 if (labelsCache.getLabelOrdinal(label) == 0) { 3547 return false; 3548 } 3549 } 3550 return true; 3551 } 3552 3553 @Override 3554 public String explainFailure() { 3555 for (String label : labels) { 3556 if (labelsCache.getLabelOrdinal(label) == 0) { 3557 return label + " is not available yet"; 3558 } 3559 } 3560 return ""; 3561 } 3562 }); 3563 } 3564 3565 /** 3566 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3567 * available. 3568 * @return the list of column descriptors 3569 */ 3570 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 3571 return generateColumnDescriptors(""); 3572 } 3573 3574 /** 3575 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3576 * available. 3577 * @param prefix family names prefix 3578 * @return the list of column descriptors 3579 */ 3580 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 3581 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 3582 long familyId = 0; 3583 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 3584 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 3585 for (BloomType bloomType : BloomType.values()) { 3586 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 3587 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 3588 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 3589 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 3590 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 3591 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 3592 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 3593 familyId++; 3594 } 3595 } 3596 } 3597 return columnFamilyDescriptors; 3598 } 3599 3600 /** 3601 * Get supported compression algorithms. 3602 * @return supported compression algorithms. 3603 */ 3604 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 3605 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 3606 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 3607 for (String algoName : allAlgos) { 3608 try { 3609 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 3610 algo.getCompressor(); 3611 supportedAlgos.add(algo); 3612 } catch (Throwable t) { 3613 // this algo is not available 3614 } 3615 } 3616 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 3617 } 3618 3619 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 3620 Scan scan = new Scan().withStartRow(row); 3621 scan.setReadType(ReadType.PREAD); 3622 scan.setCaching(1); 3623 scan.setReversed(true); 3624 scan.addFamily(family); 3625 try (RegionScanner scanner = r.getScanner(scan)) { 3626 List<Cell> cells = new ArrayList<>(1); 3627 scanner.next(cells); 3628 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 3629 return null; 3630 } 3631 return Result.create(cells); 3632 } 3633 } 3634 3635 private boolean isTargetTable(final byte[] inRow, Cell c) { 3636 String inputRowString = Bytes.toString(inRow); 3637 int i = inputRowString.indexOf(HConstants.DELIMITER); 3638 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 3639 int o = outputRowString.indexOf(HConstants.DELIMITER); 3640 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 3641 } 3642 3643 /** 3644 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 3645 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 3646 * kerby KDC server and utility for using it, 3647 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 3648 * less baggage. It came in in HBASE-5291. 3649 */ 3650 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 3651 Properties conf = MiniKdc.createConf(); 3652 conf.put(MiniKdc.DEBUG, true); 3653 MiniKdc kdc = null; 3654 File dir = null; 3655 // There is time lag between selecting a port and trying to bind with it. It's possible that 3656 // another service captures the port in between which'll result in BindException. 3657 boolean bindException; 3658 int numTries = 0; 3659 do { 3660 try { 3661 bindException = false; 3662 dir = new File(getDataTestDir("kdc").toUri().getPath()); 3663 kdc = new MiniKdc(conf, dir); 3664 kdc.start(); 3665 } catch (BindException e) { 3666 FileUtils.deleteDirectory(dir); // clean directory 3667 numTries++; 3668 if (numTries == 3) { 3669 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 3670 throw e; 3671 } 3672 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 3673 bindException = true; 3674 } 3675 } while (bindException); 3676 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 3677 return kdc; 3678 } 3679 3680 public int getNumHFiles(final TableName tableName, final byte[] family) { 3681 int numHFiles = 0; 3682 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 3683 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 3684 } 3685 return numHFiles; 3686 } 3687 3688 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 3689 final byte[] family) { 3690 int numHFiles = 0; 3691 for (Region region : rs.getRegions(tableName)) { 3692 numHFiles += region.getStore(family).getStorefilesCount(); 3693 } 3694 return numHFiles; 3695 } 3696 3697 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 3698 assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode()); 3699 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 3700 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 3701 assertEquals(ltdFamilies.size(), rtdFamilies.size()); 3702 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 3703 it2 = rtdFamilies.iterator(); it.hasNext();) { 3704 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 3705 } 3706 } 3707 3708 /** 3709 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 3710 * invocations. 3711 */ 3712 public static void await(final long sleepMillis, final BooleanSupplier condition) 3713 throws InterruptedException { 3714 try { 3715 while (!condition.getAsBoolean()) { 3716 Thread.sleep(sleepMillis); 3717 } 3718 } catch (RuntimeException e) { 3719 if (e.getCause() instanceof AssertionError) { 3720 throw (AssertionError) e.getCause(); 3721 } 3722 throw e; 3723 } 3724 } 3725}