001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.Closeable; 026import java.io.File; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.io.UncheckedIOException; 030import java.lang.reflect.Field; 031import java.net.BindException; 032import java.net.DatagramSocket; 033import java.net.InetAddress; 034import java.net.ServerSocket; 035import java.net.Socket; 036import java.net.UnknownHostException; 037import java.nio.charset.StandardCharsets; 038import java.security.MessageDigest; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.HashSet; 044import java.util.Iterator; 045import java.util.List; 046import java.util.Map; 047import java.util.NavigableSet; 048import java.util.Properties; 049import java.util.Random; 050import java.util.Set; 051import java.util.TreeSet; 052import java.util.concurrent.ExecutionException; 053import java.util.concurrent.ThreadLocalRandom; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicReference; 056import java.util.function.BooleanSupplier; 057import org.apache.commons.io.FileUtils; 058import org.apache.commons.lang3.RandomStringUtils; 059import org.apache.hadoop.conf.Configuration; 060import org.apache.hadoop.fs.FileSystem; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 063import org.apache.hadoop.hbase.Waiter.Predicate; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.AsyncAdmin; 066import org.apache.hadoop.hbase.client.AsyncClusterConnection; 067import org.apache.hadoop.hbase.client.BufferedMutator; 068import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.Connection; 072import org.apache.hadoop.hbase.client.ConnectionFactory; 073import org.apache.hadoop.hbase.client.Consistency; 074import org.apache.hadoop.hbase.client.Delete; 075import org.apache.hadoop.hbase.client.Durability; 076import org.apache.hadoop.hbase.client.Get; 077import org.apache.hadoop.hbase.client.Hbck; 078import org.apache.hadoop.hbase.client.MasterRegistry; 079import org.apache.hadoop.hbase.client.Put; 080import org.apache.hadoop.hbase.client.RegionInfo; 081import org.apache.hadoop.hbase.client.RegionInfoBuilder; 082import org.apache.hadoop.hbase.client.RegionLocator; 083import org.apache.hadoop.hbase.client.Result; 084import org.apache.hadoop.hbase.client.ResultScanner; 085import org.apache.hadoop.hbase.client.Scan; 086import org.apache.hadoop.hbase.client.Scan.ReadType; 087import org.apache.hadoop.hbase.client.Table; 088import org.apache.hadoop.hbase.client.TableDescriptor; 089import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 090import org.apache.hadoop.hbase.client.TableState; 091import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 092import org.apache.hadoop.hbase.fs.HFileSystem; 093import org.apache.hadoop.hbase.io.compress.Compression; 094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 096import org.apache.hadoop.hbase.io.hfile.BlockCache; 097import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 098import org.apache.hadoop.hbase.io.hfile.HFile; 099import org.apache.hadoop.hbase.ipc.RpcServerInterface; 100import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 101import org.apache.hadoop.hbase.master.HMaster; 102import org.apache.hadoop.hbase.master.MasterFileSystem; 103import org.apache.hadoop.hbase.master.RegionState; 104import org.apache.hadoop.hbase.master.ServerManager; 105import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 106import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 107import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 108import org.apache.hadoop.hbase.master.assignment.RegionStates; 109import org.apache.hadoop.hbase.mob.MobFileCache; 110import org.apache.hadoop.hbase.regionserver.BloomType; 111import org.apache.hadoop.hbase.regionserver.ChunkCreator; 112import org.apache.hadoop.hbase.regionserver.HRegion; 113import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 114import org.apache.hadoop.hbase.regionserver.HRegionServer; 115import org.apache.hadoop.hbase.regionserver.HStore; 116import org.apache.hadoop.hbase.regionserver.InternalScanner; 117import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 118import org.apache.hadoop.hbase.regionserver.Region; 119import org.apache.hadoop.hbase.regionserver.RegionScanner; 120import org.apache.hadoop.hbase.regionserver.RegionServerServices; 121import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 122import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 123import org.apache.hadoop.hbase.security.User; 124import org.apache.hadoop.hbase.security.UserProvider; 125import org.apache.hadoop.hbase.security.access.AccessController; 126import org.apache.hadoop.hbase.security.access.PermissionStorage; 127import org.apache.hadoop.hbase.security.access.SecureTestUtil; 128import org.apache.hadoop.hbase.security.token.TokenProvider; 129import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 130import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 131import org.apache.hadoop.hbase.util.Bytes; 132import org.apache.hadoop.hbase.util.CommonFSUtils; 133import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 134import org.apache.hadoop.hbase.util.FSUtils; 135import org.apache.hadoop.hbase.util.JVM; 136import org.apache.hadoop.hbase.util.JVMClusterUtil; 137import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 138import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 139import org.apache.hadoop.hbase.util.Pair; 140import org.apache.hadoop.hbase.util.RetryCounter; 141import org.apache.hadoop.hbase.util.Threads; 142import org.apache.hadoop.hbase.wal.WAL; 143import org.apache.hadoop.hbase.wal.WALFactory; 144import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 145import org.apache.hadoop.hbase.zookeeper.ZKConfig; 146import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 147import org.apache.hadoop.hdfs.DFSClient; 148import org.apache.hadoop.hdfs.DFSConfigKeys; 149import org.apache.hadoop.hdfs.DistributedFileSystem; 150import org.apache.hadoop.hdfs.MiniDFSCluster; 151import org.apache.hadoop.hdfs.server.datanode.DataNode; 152import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 153import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 154import org.apache.hadoop.mapred.JobConf; 155import org.apache.hadoop.mapred.MiniMRCluster; 156import org.apache.hadoop.metrics2.impl.JmxCacheBuster; 157import org.apache.hadoop.minikdc.MiniKdc; 158import org.apache.yetus.audience.InterfaceAudience; 159import org.apache.yetus.audience.InterfaceStability; 160import org.apache.zookeeper.WatchedEvent; 161import org.apache.zookeeper.ZooKeeper; 162import org.apache.zookeeper.ZooKeeper.States; 163 164import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 165 166import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 167 168/** 169 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 170 * functionality. Create an instance and keep it around testing HBase. 171 * <p/> 172 * This class is meant to be your one-stop shop for anything you might need testing. Manages one 173 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster}, 174 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real 175 * cluster. 176 * <p/> 177 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration. 178 * <p/> 179 * It does not set logging levels. 180 * <p/> 181 * In the configuration properties, default values for master-info-port and region-server-port are 182 * overridden such that a random port will be assigned (thus avoiding port contention if another 183 * local HBase instance is already running). 184 * <p/> 185 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 186 * setting it to true. 187 */ 188@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 189@InterfaceStability.Evolving 190public class HBaseTestingUtil extends HBaseZKTestingUtil { 191 192 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 193 194 private MiniDFSCluster dfsCluster = null; 195 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 196 197 private volatile HBaseClusterInterface hbaseCluster = null; 198 private MiniMRCluster mrCluster = null; 199 200 /** If there is a mini cluster running for this testing utility instance. */ 201 private volatile boolean miniClusterRunning; 202 203 private String hadoopLogDir; 204 205 /** 206 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 207 */ 208 private Path dataTestDirOnTestFS = null; 209 210 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 211 212 /** Filesystem URI used for map-reduce mini-cluster setup */ 213 private static String FS_URI; 214 215 /** This is for unit tests parameterized with a single boolean. */ 216 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 217 218 static { 219 // JmxCacheBuster may cause dead lock in test environment. As on master side, the table/region 220 // related metrics updating will finally lead to a meta access, so if meta is not online yet, we 221 // will block when updating while holding the metrics lock. But when we assign meta, there are 222 // bunch of places where we need to register a new metrics thus need to get the metrics lock, 223 // and then lead to a dead lock and cause the test to hang forever. 224 // The code is in hadoop so there is no easy way for us to fix, so here we just stop 225 // JmxCacheBuster to stabilize our tests first. See HBASE-30118 for more details and future 226 // plans. 227 JmxCacheBuster.stop(); 228 } 229 230 /** 231 * Checks to see if a specific port is available. 232 * @param port the port number to check for availability 233 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 234 */ 235 public static boolean available(int port) { 236 ServerSocket ss = null; 237 DatagramSocket ds = null; 238 try { 239 ss = new ServerSocket(port); 240 ss.setReuseAddress(true); 241 ds = new DatagramSocket(port); 242 ds.setReuseAddress(true); 243 return true; 244 } catch (IOException e) { 245 // Do nothing 246 } finally { 247 if (ds != null) { 248 ds.close(); 249 } 250 251 if (ss != null) { 252 try { 253 ss.close(); 254 } catch (IOException e) { 255 /* should not be thrown */ 256 } 257 } 258 } 259 260 return false; 261 } 262 263 /** 264 * Create all combinations of Bloom filters and compression algorithms for testing. 265 */ 266 private static List<Object[]> bloomAndCompressionCombinations() { 267 List<Object[]> configurations = new ArrayList<>(); 268 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 269 for (BloomType bloomType : BloomType.values()) { 270 configurations.add(new Object[] { comprAlgo, bloomType }); 271 } 272 } 273 return Collections.unmodifiableList(configurations); 274 } 275 276 /** 277 * Create combination of memstoreTS and tags 278 */ 279 private static List<Object[]> memStoreTSAndTagsCombination() { 280 List<Object[]> configurations = new ArrayList<>(); 281 configurations.add(new Object[] { false, false }); 282 configurations.add(new Object[] { false, true }); 283 configurations.add(new Object[] { true, false }); 284 configurations.add(new Object[] { true, true }); 285 return Collections.unmodifiableList(configurations); 286 } 287 288 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 289 List<Object[]> configurations = new ArrayList<>(); 290 configurations.add(new Object[] { false, false, true }); 291 configurations.add(new Object[] { false, false, false }); 292 configurations.add(new Object[] { false, true, true }); 293 configurations.add(new Object[] { false, true, false }); 294 configurations.add(new Object[] { true, false, true }); 295 configurations.add(new Object[] { true, false, false }); 296 configurations.add(new Object[] { true, true, true }); 297 configurations.add(new Object[] { true, true, false }); 298 return Collections.unmodifiableList(configurations); 299 } 300 301 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 302 bloomAndCompressionCombinations(); 303 304 /** 305 * <p> 306 * Create an HBaseTestingUtility using a default configuration. 307 * <p> 308 * Initially, all tmp files are written to a local test data directory. Once 309 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 310 * data will be written to the DFS directory instead. 311 */ 312 public HBaseTestingUtil() { 313 this(HBaseConfiguration.create()); 314 } 315 316 /** 317 * <p> 318 * Create an HBaseTestingUtility using a given configuration. 319 * <p> 320 * Initially, all tmp files are written to a local test data directory. Once 321 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 322 * data will be written to the DFS directory instead. 323 * @param conf The configuration to use for further operations 324 */ 325 public HBaseTestingUtil(@Nullable Configuration conf) { 326 super(conf); 327 328 // a hbase checksum verification failure will cause unit tests to fail 329 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 330 331 // Save this for when setting default file:// breaks things 332 if (this.conf.get("fs.defaultFS") != null) { 333 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 334 } 335 if (this.conf.get(HConstants.HBASE_DIR) != null) { 336 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 337 } 338 // Every cluster is a local cluster until we start DFS 339 // Note that conf could be null, but this.conf will not be 340 String dataTestDir = getDataTestDir().toString(); 341 this.conf.set("fs.defaultFS", "file:///"); 342 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 343 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 344 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 345 // If the value for random ports isn't set set it to true, thus making 346 // tests opt-out for random port assignment 347 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 348 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 349 } 350 351 /** 352 * Close both the region {@code r} and it's underlying WAL. For use in tests. 353 */ 354 public static void closeRegionAndWAL(final Region r) throws IOException { 355 closeRegionAndWAL((HRegion) r); 356 } 357 358 /** 359 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 360 */ 361 public static void closeRegionAndWAL(final HRegion r) throws IOException { 362 if (r == null) return; 363 r.close(); 364 if (r.getWAL() == null) return; 365 r.getWAL().close(); 366 } 367 368 /** 369 * Start mini secure cluster with given kdc and principals. 370 * @param kdc Mini kdc server 371 * @param servicePrincipal Service principal without realm. 372 * @param spnegoPrincipal Spnego principal without realm. 373 * @return Handler to shutdown the cluster 374 */ 375 public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal, 376 String spnegoPrincipal) throws Exception { 377 Configuration conf = getConfiguration(); 378 379 SecureTestUtil.enableSecurity(conf); 380 VisibilityTestUtil.enableVisiblityLabels(conf); 381 SecureTestUtil.verifyConfiguration(conf); 382 383 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 384 AccessController.class.getName() + ',' + TokenProvider.class.getName()); 385 386 HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(), 387 spnegoPrincipal + '@' + kdc.getRealm()); 388 389 startMiniCluster(); 390 try { 391 waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); 392 } catch (Exception e) { 393 shutdownMiniCluster(); 394 throw e; 395 } 396 397 return this::shutdownMiniCluster; 398 } 399 400 /** 401 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 402 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 403 * by the Configuration. If say, a Connection was being used against a cluster that had been 404 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 405 * Rather than use the return direct, its usually best to make a copy and use that. Do 406 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 407 * @return Instance of Configuration. 408 */ 409 @Override 410 public Configuration getConfiguration() { 411 return super.getConfiguration(); 412 } 413 414 public void setHBaseCluster(HBaseClusterInterface hbaseCluster) { 415 this.hbaseCluster = hbaseCluster; 416 } 417 418 /** 419 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 420 * have many concurrent tests running if we need to. Moding a System property is not the way to do 421 * concurrent instances -- another instance could grab the temporary value unintentionally -- but 422 * not anything can do about it at moment; single instance only is how the minidfscluster works. 423 * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir 424 * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir 425 * (We do not create them!). 426 * @return The calculated data test build directory, if newly-created. 427 */ 428 @Override 429 protected Path setupDataTestDir() { 430 Path testPath = super.setupDataTestDir(); 431 if (null == testPath) { 432 return null; 433 } 434 435 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 436 437 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 438 // we want our own value to ensure uniqueness on the same machine 439 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 440 441 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 442 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 443 return testPath; 444 } 445 446 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 447 448 String sysValue = System.getProperty(propertyName); 449 450 if (sysValue != null) { 451 // There is already a value set. So we do nothing but hope 452 // that there will be no conflicts 453 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 454 + " so I do NOT create it in " + parent); 455 String confValue = conf.get(propertyName); 456 if (confValue != null && !confValue.endsWith(sysValue)) { 457 LOG.warn(propertyName + " property value differs in configuration and system: " 458 + "Configuration=" + confValue + " while System=" + sysValue 459 + " Erasing configuration value by system value."); 460 } 461 conf.set(propertyName, sysValue); 462 } else { 463 // Ok, it's not set, so we create it as a subdirectory 464 createSubDir(propertyName, parent, subDirName); 465 System.setProperty(propertyName, conf.get(propertyName)); 466 } 467 } 468 469 /** 470 * @return Where to write test data on the test filesystem; Returns working directory for the test 471 * filesystem by default 472 * @see #setupDataTestDirOnTestFS() 473 * @see #getTestFileSystem() 474 */ 475 private Path getBaseTestDirOnTestFS() throws IOException { 476 FileSystem fs = getTestFileSystem(); 477 return new Path(fs.getWorkingDirectory(), "test-data"); 478 } 479 480 /** 481 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 482 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 483 * on it. 484 * @return a unique path in the test filesystem 485 */ 486 public Path getDataTestDirOnTestFS() throws IOException { 487 if (dataTestDirOnTestFS == null) { 488 setupDataTestDirOnTestFS(); 489 } 490 491 return dataTestDirOnTestFS; 492 } 493 494 /** 495 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 496 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 497 * on it. 498 * @return a unique path in the test filesystem 499 * @param subdirName name of the subdir to create under the base test dir 500 */ 501 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 502 return new Path(getDataTestDirOnTestFS(), subdirName); 503 } 504 505 /** 506 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 507 * setup. 508 */ 509 private void setupDataTestDirOnTestFS() throws IOException { 510 if (dataTestDirOnTestFS != null) { 511 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 512 return; 513 } 514 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 515 } 516 517 /** 518 * Sets up a new path in test filesystem to be used by tests. 519 */ 520 private Path getNewDataTestDirOnTestFS() throws IOException { 521 // The file system can be either local, mini dfs, or if the configuration 522 // is supplied externally, it can be an external cluster FS. If it is a local 523 // file system, the tests should use getBaseTestDir, otherwise, we can use 524 // the working directory, and create a unique sub dir there 525 FileSystem fs = getTestFileSystem(); 526 Path newDataTestDir; 527 String randomStr = getRandomUUID().toString(); 528 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 529 newDataTestDir = new Path(getDataTestDir(), randomStr); 530 File dataTestDir = new File(newDataTestDir.toString()); 531 if (deleteOnExit()) dataTestDir.deleteOnExit(); 532 } else { 533 Path base = getBaseTestDirOnTestFS(); 534 newDataTestDir = new Path(base, randomStr); 535 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 536 } 537 return newDataTestDir; 538 } 539 540 /** 541 * Cleans the test data directory on the test filesystem. 542 * @return True if we removed the test dirs 543 */ 544 public boolean cleanupDataTestDirOnTestFS() throws IOException { 545 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 546 if (ret) { 547 dataTestDirOnTestFS = null; 548 } 549 return ret; 550 } 551 552 /** 553 * Cleans a subdirectory under the test data directory on the test filesystem. 554 * @return True if we removed child 555 */ 556 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 557 Path cpath = getDataTestDirOnTestFS(subdirName); 558 return getTestFileSystem().delete(cpath, true); 559 } 560 561 /** 562 * Start a minidfscluster. 563 * @param servers How many DNs to start. 564 * @see #shutdownMiniDFSCluster() 565 * @return The mini dfs cluster created. 566 */ 567 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 568 return startMiniDFSCluster(servers, null); 569 } 570 571 /** 572 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 573 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 574 * instances of the datanodes will have the same host name. 575 * @param hosts hostnames DNs to run on. 576 * @see #shutdownMiniDFSCluster() 577 * @return The mini dfs cluster created. 578 */ 579 public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception { 580 if (hosts != null && hosts.length != 0) { 581 return startMiniDFSCluster(hosts.length, hosts); 582 } else { 583 return startMiniDFSCluster(1, null); 584 } 585 } 586 587 /** 588 * Start a minidfscluster. Can only create one. 589 * @param servers How many DNs to start. 590 * @param hosts hostnames DNs to run on. 591 * @see #shutdownMiniDFSCluster() 592 * @return The mini dfs cluster created. 593 */ 594 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception { 595 return startMiniDFSCluster(servers, null, hosts); 596 } 597 598 private void setFs() throws IOException { 599 if (this.dfsCluster == null) { 600 LOG.info("Skipping setting fs because dfsCluster is null"); 601 return; 602 } 603 FileSystem fs = this.dfsCluster.getFileSystem(); 604 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 605 606 // re-enable this check with dfs 607 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 608 } 609 610 // Workaround to avoid IllegalThreadStateException 611 // See HBASE-27148 for more details 612 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 613 614 private volatile boolean stopped = false; 615 616 private final MiniDFSCluster cluster; 617 618 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 619 super("FsDatasetAsyncDiskServiceFixer"); 620 setDaemon(true); 621 this.cluster = cluster; 622 } 623 624 @Override 625 public void run() { 626 while (!stopped) { 627 try { 628 Thread.sleep(30000); 629 } catch (InterruptedException e) { 630 Thread.currentThread().interrupt(); 631 continue; 632 } 633 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 634 // timeout of the thread pool executor is 60 seconds by default. 635 try { 636 for (DataNode dn : cluster.getDataNodes()) { 637 FsDatasetSpi<?> dataset = dn.getFSDataset(); 638 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 639 service.setAccessible(true); 640 Object asyncDiskService = service.get(dataset); 641 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 642 group.setAccessible(true); 643 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 644 if (threadGroup.isDaemon()) { 645 threadGroup.setDaemon(false); 646 } 647 } 648 } catch (NoSuchFieldException e) { 649 LOG.debug("NoSuchFieldException: " + e.getMessage() 650 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 651 + "See HBASE-27595 for details."); 652 } catch (Exception e) { 653 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 654 } 655 } 656 } 657 658 void shutdown() { 659 stopped = true; 660 interrupt(); 661 } 662 } 663 664 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts) 665 throws Exception { 666 createDirsAndSetProperties(); 667 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 668 669 this.dfsCluster = 670 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 671 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 672 this.dfsClusterFixer.start(); 673 // Set this just-started cluster as our filesystem. 674 setFs(); 675 676 // Wait for the cluster to be totally up 677 this.dfsCluster.waitClusterUp(); 678 679 // reset the test directory for test file system 680 dataTestDirOnTestFS = null; 681 String dataTestDir = getDataTestDir().toString(); 682 conf.set(HConstants.HBASE_DIR, dataTestDir); 683 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 684 685 return this.dfsCluster; 686 } 687 688 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 689 createDirsAndSetProperties(); 690 dfsCluster = 691 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 692 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 693 this.dfsClusterFixer.start(); 694 return dfsCluster; 695 } 696 697 /** 698 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 699 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 700 * the conf. 701 * 702 * <pre> 703 * Configuration conf = TEST_UTIL.getConfiguration(); 704 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 705 * Map.Entry<String, String> e = i.next(); 706 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 707 * } 708 * </pre> 709 */ 710 private void createDirsAndSetProperties() throws IOException { 711 setupClusterTestDir(); 712 conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath()); 713 createDirAndSetProperty("test.cache.data"); 714 createDirAndSetProperty("hadoop.tmp.dir"); 715 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 716 createDirAndSetProperty("mapreduce.cluster.local.dir"); 717 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 718 enableShortCircuit(); 719 720 Path root = getDataTestDirOnTestFS("hadoop"); 721 conf.set(MapreduceTestingShim.getMROutputDirProp(), 722 new Path(root, "mapred-output-dir").toString()); 723 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 724 conf.set("mapreduce.jobtracker.staging.root.dir", 725 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 726 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 727 conf.set("yarn.app.mapreduce.am.staging-dir", 728 new Path(root, "mapreduce-am-staging-root-dir").toString()); 729 730 // Frustrate yarn's and hdfs's attempts at writing /tmp. 731 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 732 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 733 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 734 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 735 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 736 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 737 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 738 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 739 createDirAndSetProperty("dfs.journalnode.edits.dir"); 740 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 741 createDirAndSetProperty("nfs.dump.dir"); 742 createDirAndSetProperty("java.io.tmpdir"); 743 createDirAndSetProperty("dfs.journalnode.edits.dir"); 744 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 745 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 746 747 // disable metrics logger since it depend on commons-logging internal classes and we do not want 748 // commons-logging on our classpath 749 conf.setInt(DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0); 750 conf.setInt(DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0); 751 } 752 753 /** 754 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 755 * Default to false. 756 */ 757 public boolean isNewVersionBehaviorEnabled() { 758 final String propName = "hbase.tests.new.version.behavior"; 759 String v = System.getProperty(propName); 760 if (v != null) { 761 return Boolean.parseBoolean(v); 762 } 763 return false; 764 } 765 766 /** 767 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 768 * allows to specify this parameter on the command line. If not set, default is true. 769 */ 770 public boolean isReadShortCircuitOn() { 771 final String propName = "hbase.tests.use.shortcircuit.reads"; 772 String readOnProp = System.getProperty(propName); 773 if (readOnProp != null) { 774 return Boolean.parseBoolean(readOnProp); 775 } else { 776 return conf.getBoolean(propName, false); 777 } 778 } 779 780 /** 781 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 782 * including skipping the hdfs checksum checks. 783 */ 784 private void enableShortCircuit() { 785 if (isReadShortCircuitOn()) { 786 String curUser = System.getProperty("user.name"); 787 LOG.info("read short circuit is ON for user " + curUser); 788 // read short circuit, for hdfs 789 conf.set("dfs.block.local-path-access.user", curUser); 790 // read short circuit, for hbase 791 conf.setBoolean("dfs.client.read.shortcircuit", true); 792 // Skip checking checksum, for the hdfs client and the datanode 793 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 794 } else { 795 LOG.info("read short circuit is OFF"); 796 } 797 } 798 799 private String createDirAndSetProperty(final String property) { 800 return createDirAndSetProperty(property, property); 801 } 802 803 private String createDirAndSetProperty(final String relPath, String property) { 804 String path = getDataTestDir(relPath).toString(); 805 System.setProperty(property, path); 806 conf.set(property, path); 807 new File(path).mkdirs(); 808 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 809 return path; 810 } 811 812 /** 813 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 814 */ 815 public void shutdownMiniDFSCluster() throws IOException { 816 if (this.dfsCluster != null) { 817 // The below throws an exception per dn, AsynchronousCloseException. 818 this.dfsCluster.shutdown(); 819 dfsCluster = null; 820 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 821 // have a fixer 822 if (dfsClusterFixer != null) { 823 this.dfsClusterFixer.shutdown(); 824 dfsClusterFixer = null; 825 } 826 dataTestDirOnTestFS = null; 827 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 828 } 829 } 830 831 /** 832 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 833 * other options will use default values, defined in {@link StartTestingClusterOption.Builder}. 834 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 835 * @see #startMiniCluster(StartTestingClusterOption option) 836 * @see #shutdownMiniDFSCluster() 837 */ 838 public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception { 839 StartTestingClusterOption option = StartTestingClusterOption.builder() 840 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 841 return startMiniCluster(option); 842 } 843 844 /** 845 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 846 * value can be found in {@link StartTestingClusterOption.Builder}. 847 * @see #startMiniCluster(StartTestingClusterOption option) 848 * @see #shutdownMiniDFSCluster() 849 */ 850 public SingleProcessHBaseCluster startMiniCluster() throws Exception { 851 return startMiniCluster(StartTestingClusterOption.builder().build()); 852 } 853 854 /** 855 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 856 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 857 * under System property test.build.data, to be cleaned up on exit. 858 * @see #shutdownMiniDFSCluster() 859 */ 860 public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option) 861 throws Exception { 862 LOG.info("Starting up minicluster with option: {}", option); 863 864 // If we already put up a cluster, fail. 865 if (miniClusterRunning) { 866 throw new IllegalStateException("A mini-cluster is already running"); 867 } 868 miniClusterRunning = true; 869 870 setupClusterTestDir(); 871 872 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 873 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 874 if (dfsCluster == null) { 875 LOG.info("STARTING DFS"); 876 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 877 } else { 878 LOG.info("NOT STARTING DFS"); 879 } 880 881 // Start up a zk cluster. 882 if (getZkCluster() == null) { 883 startMiniZKCluster(option.getNumZkServers()); 884 } 885 886 // Start the MiniHBaseCluster 887 return startMiniHBaseCluster(option); 888 } 889 890 /** 891 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 892 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 893 * @return Reference to the hbase mini hbase cluster. 894 * @see #startMiniCluster(StartTestingClusterOption) 895 * @see #shutdownMiniHBaseCluster() 896 */ 897 public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option) 898 throws IOException, InterruptedException { 899 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 900 createRootDir(option.isCreateRootDir()); 901 if (option.isCreateWALDir()) { 902 createWALRootDir(); 903 } 904 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 905 // for tests that do not read hbase-defaults.xml 906 setHBaseFsTmpDir(); 907 908 // These settings will make the server waits until this exact number of 909 // regions servers are connected. 910 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 911 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 912 } 913 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 914 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 915 } 916 917 Configuration c = new Configuration(this.conf); 918 this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(), 919 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 920 option.getMasterClass(), option.getRsClass()); 921 // Populate the master address configuration from mini cluster configuration. 922 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 923 // Don't leave here till we've done a successful scan of the hbase:meta 924 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 925 ResultScanner s = t.getScanner(new Scan())) { 926 for (;;) { 927 if (s.next() == null) { 928 break; 929 } 930 } 931 } 932 933 getAdmin(); // create immediately the hbaseAdmin 934 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 935 936 return (SingleProcessHBaseCluster) hbaseCluster; 937 } 938 939 /** 940 * Starts up mini hbase cluster using default options. Default options can be found in 941 * {@link StartTestingClusterOption.Builder}. 942 * @see #startMiniHBaseCluster(StartTestingClusterOption) 943 * @see #shutdownMiniHBaseCluster() 944 */ 945 public SingleProcessHBaseCluster startMiniHBaseCluster() 946 throws IOException, InterruptedException { 947 return startMiniHBaseCluster(StartTestingClusterOption.builder().build()); 948 } 949 950 /** 951 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 952 * {@link #startMiniCluster()}. All other options will use default values, defined in 953 * {@link StartTestingClusterOption.Builder}. 954 * @param numMasters Master node number. 955 * @param numRegionServers Number of region servers. 956 * @return The mini HBase cluster created. 957 * @see #shutdownMiniHBaseCluster() 958 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 959 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 960 * @see #startMiniHBaseCluster(StartTestingClusterOption) 961 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 962 */ 963 @Deprecated 964 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 965 throws IOException, InterruptedException { 966 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 967 .numRegionServers(numRegionServers).build(); 968 return startMiniHBaseCluster(option); 969 } 970 971 /** 972 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 973 * {@link #startMiniCluster()}. All other options will use default values, defined in 974 * {@link StartTestingClusterOption.Builder}. 975 * @param numMasters Master node number. 976 * @param numRegionServers Number of region servers. 977 * @param rsPorts Ports that RegionServer should use. 978 * @return The mini HBase cluster created. 979 * @see #shutdownMiniHBaseCluster() 980 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 981 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 982 * @see #startMiniHBaseCluster(StartTestingClusterOption) 983 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 984 */ 985 @Deprecated 986 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 987 List<Integer> rsPorts) throws IOException, InterruptedException { 988 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 989 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 990 return startMiniHBaseCluster(option); 991 } 992 993 /** 994 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 995 * {@link #startMiniCluster()}. All other options will use default values, defined in 996 * {@link StartTestingClusterOption.Builder}. 997 * @param numMasters Master node number. 998 * @param numRegionServers Number of region servers. 999 * @param rsPorts Ports that RegionServer should use. 1000 * @param masterClass The class to use as HMaster, or null for default. 1001 * @param rsClass The class to use as HRegionServer, or null for default. 1002 * @param createRootDir Whether to create a new root or data directory path. 1003 * @param createWALDir Whether to create a new WAL directory. 1004 * @return The mini HBase cluster created. 1005 * @see #shutdownMiniHBaseCluster() 1006 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 1007 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 1008 * @see #startMiniHBaseCluster(StartTestingClusterOption) 1009 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 1010 */ 1011 @Deprecated 1012 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 1013 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 1014 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass, 1015 boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException { 1016 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 1017 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 1018 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 1019 return startMiniHBaseCluster(option); 1020 } 1021 1022 /** 1023 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 1024 * want to keep dfs/zk up and just stop/start hbase. 1025 * @param servers number of region servers 1026 */ 1027 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 1028 this.restartHBaseCluster(servers, null); 1029 } 1030 1031 public void restartHBaseCluster(int servers, List<Integer> ports) 1032 throws IOException, InterruptedException { 1033 StartTestingClusterOption option = 1034 StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 1035 restartHBaseCluster(option); 1036 invalidateConnection(); 1037 } 1038 1039 public void restartHBaseCluster(StartTestingClusterOption option) 1040 throws IOException, InterruptedException { 1041 closeConnection(); 1042 this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(), 1043 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1044 option.getMasterClass(), option.getRsClass()); 1045 // Don't leave here till we've done a successful scan of the hbase:meta 1046 Connection conn = ConnectionFactory.createConnection(this.conf); 1047 Table t = conn.getTable(TableName.META_TABLE_NAME); 1048 ResultScanner s = t.getScanner(new Scan()); 1049 while (s.next() != null) { 1050 // do nothing 1051 } 1052 LOG.info("HBase has been restarted"); 1053 s.close(); 1054 t.close(); 1055 conn.close(); 1056 } 1057 1058 /** 1059 * Returns current mini hbase cluster. Only has something in it after a call to 1060 * {@link #startMiniCluster()}. 1061 * @see #startMiniCluster() 1062 */ 1063 public SingleProcessHBaseCluster getMiniHBaseCluster() { 1064 if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) { 1065 return (SingleProcessHBaseCluster) this.hbaseCluster; 1066 } 1067 throw new RuntimeException( 1068 hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName()); 1069 } 1070 1071 /** 1072 * Stops mini hbase, zk, and hdfs clusters. 1073 * @see #startMiniCluster(int) 1074 */ 1075 public void shutdownMiniCluster() throws IOException { 1076 LOG.info("Shutting down minicluster"); 1077 shutdownMiniHBaseCluster(); 1078 shutdownMiniDFSCluster(); 1079 shutdownMiniZKCluster(); 1080 1081 cleanupTestDir(); 1082 miniClusterRunning = false; 1083 LOG.info("Minicluster is down"); 1084 } 1085 1086 /** 1087 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1088 * @throws java.io.IOException in case command is unsuccessful 1089 */ 1090 public void shutdownMiniHBaseCluster() throws IOException { 1091 cleanup(); 1092 if (this.hbaseCluster != null) { 1093 this.hbaseCluster.shutdown(); 1094 // Wait till hbase is down before going on to shutdown zk. 1095 this.hbaseCluster.waitUntilShutDown(); 1096 this.hbaseCluster = null; 1097 } 1098 if (zooKeeperWatcher != null) { 1099 zooKeeperWatcher.close(); 1100 zooKeeperWatcher = null; 1101 } 1102 } 1103 1104 /** 1105 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1106 * @throws java.io.IOException throws in case command is unsuccessful 1107 */ 1108 public void killMiniHBaseCluster() throws IOException { 1109 cleanup(); 1110 if (this.hbaseCluster != null) { 1111 getMiniHBaseCluster().killAll(); 1112 this.hbaseCluster = null; 1113 } 1114 if (zooKeeperWatcher != null) { 1115 zooKeeperWatcher.close(); 1116 zooKeeperWatcher = null; 1117 } 1118 } 1119 1120 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1121 private void cleanup() throws IOException { 1122 closeConnection(); 1123 // unset the configuration for MIN and MAX RS to start 1124 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1125 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1126 } 1127 1128 /** 1129 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1130 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1131 * If false, previous path is used. Note: this does not cause the root dir to be created. 1132 * @return Fully qualified path for the default hbase root dir 1133 */ 1134 public Path getDefaultRootDirPath(boolean create) throws IOException { 1135 if (!create) { 1136 return getDataTestDirOnTestFS(); 1137 } else { 1138 return getNewDataTestDirOnTestFS(); 1139 } 1140 } 1141 1142 /** 1143 * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that 1144 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1145 * @return Fully qualified path for the default hbase root dir 1146 */ 1147 public Path getDefaultRootDirPath() throws IOException { 1148 return getDefaultRootDirPath(false); 1149 } 1150 1151 /** 1152 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1153 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1154 * startup. You'd only use this method if you were doing manual operation. 1155 * @param create This flag decides whether to get a new root or data directory path or not, if it 1156 * has been fetched already. Note : Directory will be made irrespective of whether 1157 * path has been fetched or not. If directory already exists, it will be overwritten 1158 * @return Fully qualified path to hbase root dir 1159 */ 1160 public Path createRootDir(boolean create) throws IOException { 1161 FileSystem fs = FileSystem.get(this.conf); 1162 Path hbaseRootdir = getDefaultRootDirPath(create); 1163 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1164 fs.mkdirs(hbaseRootdir); 1165 FSUtils.setVersion(fs, hbaseRootdir); 1166 return hbaseRootdir; 1167 } 1168 1169 /** 1170 * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code> 1171 * flag is false. 1172 * @return Fully qualified path to hbase root dir 1173 */ 1174 public Path createRootDir() throws IOException { 1175 return createRootDir(false); 1176 } 1177 1178 /** 1179 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1180 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1181 * this method if you were doing manual operation. 1182 * @return Fully qualified path to hbase root dir 1183 */ 1184 public Path createWALRootDir() throws IOException { 1185 FileSystem fs = FileSystem.get(this.conf); 1186 Path walDir = getNewDataTestDirOnTestFS(); 1187 CommonFSUtils.setWALRootDir(this.conf, walDir); 1188 fs.mkdirs(walDir); 1189 return walDir; 1190 } 1191 1192 private void setHBaseFsTmpDir() throws IOException { 1193 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1194 if (hbaseFsTmpDirInString == null) { 1195 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1196 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1197 } else { 1198 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1199 } 1200 } 1201 1202 /** 1203 * Flushes all caches in the mini hbase cluster 1204 */ 1205 public void flush() throws IOException { 1206 getMiniHBaseCluster().flushcache(); 1207 } 1208 1209 /** 1210 * Flushes all caches in the mini hbase cluster 1211 */ 1212 public void flush(TableName tableName) throws IOException { 1213 getMiniHBaseCluster().flushcache(tableName); 1214 } 1215 1216 /** 1217 * Compact all regions in the mini hbase cluster 1218 */ 1219 public void compact(boolean major) throws IOException { 1220 getMiniHBaseCluster().compact(major); 1221 } 1222 1223 /** 1224 * Compact all of a table's reagion in the mini hbase cluster 1225 */ 1226 public void compact(TableName tableName, boolean major) throws IOException { 1227 getMiniHBaseCluster().compact(tableName, major); 1228 } 1229 1230 /** 1231 * Create a table. 1232 * @return A Table instance for the created table. 1233 */ 1234 public Table createTable(TableName tableName, String family) throws IOException { 1235 return createTable(tableName, new String[] { family }); 1236 } 1237 1238 /** 1239 * Create a table. 1240 * @return A Table instance for the created table. 1241 */ 1242 public Table createTable(TableName tableName, String[] families) throws IOException { 1243 List<byte[]> fams = new ArrayList<>(families.length); 1244 for (String family : families) { 1245 fams.add(Bytes.toBytes(family)); 1246 } 1247 return createTable(tableName, fams.toArray(new byte[0][])); 1248 } 1249 1250 /** 1251 * Create a table. 1252 * @return A Table instance for the created table. 1253 */ 1254 public Table createTable(TableName tableName, byte[] family) throws IOException { 1255 return createTable(tableName, new byte[][] { family }); 1256 } 1257 1258 /** 1259 * Create a table with multiple regions. 1260 * @return A Table instance for the created table. 1261 */ 1262 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1263 throws IOException { 1264 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1265 byte[] startKey = Bytes.toBytes("aaaaa"); 1266 byte[] endKey = Bytes.toBytes("zzzzz"); 1267 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1268 1269 return createTable(tableName, new byte[][] { family }, splitKeys); 1270 } 1271 1272 /** 1273 * Create a table. 1274 * @return A Table instance for the created table. 1275 */ 1276 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1277 return createTable(tableName, families, (byte[][]) null); 1278 } 1279 1280 /** 1281 * Create a table with multiple regions. 1282 * @return A Table instance for the created table. 1283 */ 1284 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1285 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1286 } 1287 1288 /** 1289 * Create a table with multiple regions. 1290 * @param replicaCount replica count. 1291 * @return A Table instance for the created table. 1292 */ 1293 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1294 throws IOException { 1295 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1296 } 1297 1298 /** 1299 * Create a table. 1300 * @return A Table instance for the created table. 1301 */ 1302 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1303 throws IOException { 1304 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1305 } 1306 1307 /** 1308 * Create a table. 1309 * @param tableName the table name 1310 * @param families the families 1311 * @param splitKeys the splitkeys 1312 * @param replicaCount the region replica count 1313 * @return A Table instance for the created table. 1314 * @throws IOException throws IOException 1315 */ 1316 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1317 int replicaCount) throws IOException { 1318 return createTable(tableName, families, splitKeys, replicaCount, 1319 new Configuration(getConfiguration())); 1320 } 1321 1322 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1323 byte[] endKey, int numRegions) throws IOException { 1324 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1325 1326 getAdmin().createTable(desc, startKey, endKey, numRegions); 1327 // HBaseAdmin only waits for regions to appear in hbase:meta we 1328 // should wait until they are assigned 1329 waitUntilAllRegionsAssigned(tableName); 1330 return getConnection().getTable(tableName); 1331 } 1332 1333 /** 1334 * Create a table. 1335 * @param c Configuration to use 1336 * @return A Table instance for the created table. 1337 */ 1338 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1339 throws IOException { 1340 return createTable(htd, families, null, c); 1341 } 1342 1343 /** 1344 * Create a table. 1345 * @param htd table descriptor 1346 * @param families array of column families 1347 * @param splitKeys array of split keys 1348 * @param c Configuration to use 1349 * @return A Table instance for the created table. 1350 * @throws IOException if getAdmin or createTable fails 1351 */ 1352 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1353 Configuration c) throws IOException { 1354 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1355 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1356 // on is interfering. 1357 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1358 } 1359 1360 /** 1361 * Create a table. 1362 * @param htd table descriptor 1363 * @param families array of column families 1364 * @param splitKeys array of split keys 1365 * @param type Bloom type 1366 * @param blockSize block size 1367 * @param c Configuration to use 1368 * @return A Table instance for the created table. 1369 * @throws IOException if getAdmin or createTable fails 1370 */ 1371 1372 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1373 BloomType type, int blockSize, Configuration c) throws IOException { 1374 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1375 for (byte[] family : families) { 1376 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1377 .setBloomFilterType(type).setBlocksize(blockSize); 1378 if (isNewVersionBehaviorEnabled()) { 1379 cfdb.setNewVersionBehavior(true); 1380 } 1381 builder.setColumnFamily(cfdb.build()); 1382 } 1383 TableDescriptor td = builder.build(); 1384 if (splitKeys != null) { 1385 getAdmin().createTable(td, splitKeys); 1386 } else { 1387 getAdmin().createTable(td); 1388 } 1389 // HBaseAdmin only waits for regions to appear in hbase:meta 1390 // we should wait until they are assigned 1391 waitUntilAllRegionsAssigned(td.getTableName()); 1392 return getConnection().getTable(td.getTableName()); 1393 } 1394 1395 /** 1396 * Create a table. 1397 * @param htd table descriptor 1398 * @param splitRows array of split keys 1399 * @return A Table instance for the created table. 1400 */ 1401 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1402 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1403 if (isNewVersionBehaviorEnabled()) { 1404 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1405 builder.setColumnFamily( 1406 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1407 } 1408 } 1409 if (splitRows != null) { 1410 getAdmin().createTable(builder.build(), splitRows); 1411 } else { 1412 getAdmin().createTable(builder.build()); 1413 } 1414 // HBaseAdmin only waits for regions to appear in hbase:meta 1415 // we should wait until they are assigned 1416 waitUntilAllRegionsAssigned(htd.getTableName()); 1417 return getConnection().getTable(htd.getTableName()); 1418 } 1419 1420 /** 1421 * Create a table. 1422 * @param tableName the table name 1423 * @param families the families 1424 * @param splitKeys the split keys 1425 * @param replicaCount the replica count 1426 * @param c Configuration to use 1427 * @return A Table instance for the created table. 1428 */ 1429 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1430 int replicaCount, final Configuration c) throws IOException { 1431 TableDescriptor htd = 1432 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1433 return createTable(htd, families, splitKeys, c); 1434 } 1435 1436 /** 1437 * Create a table. 1438 * @return A Table instance for the created table. 1439 */ 1440 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1441 return createTable(tableName, new byte[][] { family }, numVersions); 1442 } 1443 1444 /** 1445 * Create a table. 1446 * @return A Table instance for the created table. 1447 */ 1448 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1449 throws IOException { 1450 return createTable(tableName, families, numVersions, (byte[][]) null); 1451 } 1452 1453 /** 1454 * Create a table. 1455 * @return A Table instance for the created table. 1456 */ 1457 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1458 byte[][] splitKeys) throws IOException { 1459 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1460 for (byte[] family : families) { 1461 ColumnFamilyDescriptorBuilder cfBuilder = 1462 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1463 if (isNewVersionBehaviorEnabled()) { 1464 cfBuilder.setNewVersionBehavior(true); 1465 } 1466 builder.setColumnFamily(cfBuilder.build()); 1467 } 1468 if (splitKeys != null) { 1469 getAdmin().createTable(builder.build(), splitKeys); 1470 } else { 1471 getAdmin().createTable(builder.build()); 1472 } 1473 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1474 // assigned 1475 waitUntilAllRegionsAssigned(tableName); 1476 return getConnection().getTable(tableName); 1477 } 1478 1479 /** 1480 * Create a table with multiple regions. 1481 * @return A Table instance for the created table. 1482 */ 1483 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1484 throws IOException { 1485 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1486 } 1487 1488 /** 1489 * Create a table. 1490 * @return A Table instance for the created table. 1491 */ 1492 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1493 throws IOException { 1494 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1495 for (byte[] family : families) { 1496 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1497 .setMaxVersions(numVersions).setBlocksize(blockSize); 1498 if (isNewVersionBehaviorEnabled()) { 1499 cfBuilder.setNewVersionBehavior(true); 1500 } 1501 builder.setColumnFamily(cfBuilder.build()); 1502 } 1503 getAdmin().createTable(builder.build()); 1504 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1505 // assigned 1506 waitUntilAllRegionsAssigned(tableName); 1507 return getConnection().getTable(tableName); 1508 } 1509 1510 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1511 String cpName) throws IOException { 1512 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1513 for (byte[] family : families) { 1514 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1515 .setMaxVersions(numVersions).setBlocksize(blockSize); 1516 if (isNewVersionBehaviorEnabled()) { 1517 cfBuilder.setNewVersionBehavior(true); 1518 } 1519 builder.setColumnFamily(cfBuilder.build()); 1520 } 1521 if (cpName != null) { 1522 builder.setCoprocessor(cpName); 1523 } 1524 getAdmin().createTable(builder.build()); 1525 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1526 // assigned 1527 waitUntilAllRegionsAssigned(tableName); 1528 return getConnection().getTable(tableName); 1529 } 1530 1531 /** 1532 * Create a table. 1533 * @return A Table instance for the created table. 1534 */ 1535 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1536 throws IOException { 1537 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1538 int i = 0; 1539 for (byte[] family : families) { 1540 ColumnFamilyDescriptorBuilder cfBuilder = 1541 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1542 if (isNewVersionBehaviorEnabled()) { 1543 cfBuilder.setNewVersionBehavior(true); 1544 } 1545 builder.setColumnFamily(cfBuilder.build()); 1546 i++; 1547 } 1548 getAdmin().createTable(builder.build()); 1549 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1550 // assigned 1551 waitUntilAllRegionsAssigned(tableName); 1552 return getConnection().getTable(tableName); 1553 } 1554 1555 /** 1556 * Create a table. 1557 * @return A Table instance for the created table. 1558 */ 1559 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1560 throws IOException { 1561 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1562 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1563 if (isNewVersionBehaviorEnabled()) { 1564 cfBuilder.setNewVersionBehavior(true); 1565 } 1566 builder.setColumnFamily(cfBuilder.build()); 1567 getAdmin().createTable(builder.build(), splitRows); 1568 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1569 // assigned 1570 waitUntilAllRegionsAssigned(tableName); 1571 return getConnection().getTable(tableName); 1572 } 1573 1574 /** 1575 * Create a table with multiple regions. 1576 * @return A Table instance for the created table. 1577 */ 1578 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1579 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1580 } 1581 1582 /** 1583 * Set the number of Region replicas. 1584 */ 1585 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1586 throws IOException, InterruptedException { 1587 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1588 .setRegionReplication(replicaCount).build(); 1589 admin.modifyTable(desc); 1590 } 1591 1592 /** 1593 * Set the number of Region replicas. 1594 */ 1595 public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount) 1596 throws ExecutionException, IOException, InterruptedException { 1597 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get()) 1598 .setRegionReplication(replicaCount).build(); 1599 admin.modifyTable(desc).get(); 1600 } 1601 1602 /** 1603 * Drop an existing table 1604 * @param tableName existing table 1605 */ 1606 public void deleteTable(TableName tableName) throws IOException { 1607 try { 1608 getAdmin().disableTable(tableName); 1609 } catch (TableNotEnabledException e) { 1610 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1611 } 1612 getAdmin().deleteTable(tableName); 1613 } 1614 1615 /** 1616 * Drop an existing table 1617 * @param tableName existing table 1618 */ 1619 public void deleteTableIfAny(TableName tableName) throws IOException { 1620 try { 1621 deleteTable(tableName); 1622 } catch (TableNotFoundException e) { 1623 // ignore 1624 } 1625 } 1626 1627 // ========================================================================== 1628 // Canned table and table descriptor creation 1629 1630 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1631 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1632 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1633 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1634 private static final int MAXVERSIONS = 3; 1635 1636 public static final char FIRST_CHAR = 'a'; 1637 public static final char LAST_CHAR = 'z'; 1638 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1639 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1640 1641 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1642 return createModifyableTableDescriptor(TableName.valueOf(name), 1643 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1644 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1645 } 1646 1647 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1648 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1649 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1650 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1651 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1652 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1653 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1654 if (isNewVersionBehaviorEnabled()) { 1655 cfBuilder.setNewVersionBehavior(true); 1656 } 1657 builder.setColumnFamily(cfBuilder.build()); 1658 } 1659 return builder.build(); 1660 } 1661 1662 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1663 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1664 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1665 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1666 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1667 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1668 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1669 if (isNewVersionBehaviorEnabled()) { 1670 cfBuilder.setNewVersionBehavior(true); 1671 } 1672 builder.setColumnFamily(cfBuilder.build()); 1673 } 1674 return builder; 1675 } 1676 1677 /** 1678 * Create a table of name <code>name</code>. 1679 * @param name Name to give table. 1680 * @return Column descriptor. 1681 */ 1682 public TableDescriptor createTableDescriptor(final TableName name) { 1683 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1684 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1685 } 1686 1687 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1688 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1689 } 1690 1691 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1692 int maxVersions) { 1693 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1694 for (byte[] family : families) { 1695 ColumnFamilyDescriptorBuilder cfBuilder = 1696 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1697 if (isNewVersionBehaviorEnabled()) { 1698 cfBuilder.setNewVersionBehavior(true); 1699 } 1700 builder.setColumnFamily(cfBuilder.build()); 1701 } 1702 return builder.build(); 1703 } 1704 1705 /** 1706 * Create an HRegion that writes to the local tmp dirs 1707 * @param desc a table descriptor indicating which table the region belongs to 1708 * @param startKey the start boundary of the region 1709 * @param endKey the end boundary of the region 1710 * @return a region that writes to local dir for testing 1711 */ 1712 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1713 throws IOException { 1714 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1715 .setEndKey(endKey).build(); 1716 return createLocalHRegion(hri, desc); 1717 } 1718 1719 /** 1720 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1721 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it. 1722 */ 1723 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1724 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1725 } 1726 1727 /** 1728 * Create an HRegion that writes to the local tmp dirs with specified wal 1729 * @param info regioninfo 1730 * @param conf configuration 1731 * @param desc table descriptor 1732 * @param wal wal for this region. 1733 * @return created hregion 1734 */ 1735 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1736 WAL wal) throws IOException { 1737 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 1738 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1739 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1740 } 1741 1742 /** 1743 * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} 1744 * when done. 1745 */ 1746 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1747 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1748 throws IOException { 1749 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1750 durability, wal, null, families); 1751 } 1752 1753 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1754 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1755 boolean[] compactedMemStore, byte[]... families) throws IOException { 1756 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1757 builder.setReadOnly(isReadOnly); 1758 int i = 0; 1759 for (byte[] family : families) { 1760 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1761 if (compactedMemStore != null && i < compactedMemStore.length) { 1762 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1763 } else { 1764 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1765 1766 } 1767 i++; 1768 // Set default to be three versions. 1769 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1770 builder.setColumnFamily(cfBuilder.build()); 1771 } 1772 builder.setDurability(durability); 1773 RegionInfo info = 1774 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1775 return createLocalHRegion(info, conf, builder.build(), wal); 1776 } 1777 1778 // 1779 // ========================================================================== 1780 1781 /** 1782 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1783 * read. 1784 * @param tableName existing table 1785 * @return HTable to that new table 1786 */ 1787 public Table deleteTableData(TableName tableName) throws IOException { 1788 Table table = getConnection().getTable(tableName); 1789 Scan scan = new Scan(); 1790 ResultScanner resScan = table.getScanner(scan); 1791 for (Result res : resScan) { 1792 Delete del = new Delete(res.getRow()); 1793 table.delete(del); 1794 } 1795 resScan = table.getScanner(scan); 1796 resScan.close(); 1797 return table; 1798 } 1799 1800 /** 1801 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1802 * table. 1803 * @param tableName table which must exist. 1804 * @param preserveRegions keep the existing split points 1805 * @return HTable for the new table 1806 */ 1807 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 1808 throws IOException { 1809 Admin admin = getAdmin(); 1810 if (!admin.isTableDisabled(tableName)) { 1811 admin.disableTable(tableName); 1812 } 1813 admin.truncateTable(tableName, preserveRegions); 1814 return getConnection().getTable(tableName); 1815 } 1816 1817 /** 1818 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1819 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 1820 * preserve regions of existing table. 1821 * @param tableName table which must exist. 1822 * @return HTable for the new table 1823 */ 1824 public Table truncateTable(final TableName tableName) throws IOException { 1825 return truncateTable(tableName, false); 1826 } 1827 1828 /** 1829 * Load table with rows from 'aaa' to 'zzz'. 1830 * @param t Table 1831 * @param f Family 1832 * @return Count of rows loaded. 1833 */ 1834 public int loadTable(final Table t, final byte[] f) throws IOException { 1835 return loadTable(t, new byte[][] { f }); 1836 } 1837 1838 /** 1839 * Load table with rows from 'aaa' to 'zzz'. 1840 * @param t Table 1841 * @param f Family 1842 * @return Count of rows loaded. 1843 */ 1844 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 1845 return loadTable(t, new byte[][] { f }, null, writeToWAL); 1846 } 1847 1848 /** 1849 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1850 * @param t Table 1851 * @param f Array of Families to load 1852 * @return Count of rows loaded. 1853 */ 1854 public int loadTable(final Table t, final byte[][] f) throws IOException { 1855 return loadTable(t, f, null); 1856 } 1857 1858 /** 1859 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1860 * @param t Table 1861 * @param f Array of Families to load 1862 * @param value the values of the cells. If null is passed, the row key is used as value 1863 * @return Count of rows loaded. 1864 */ 1865 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 1866 return loadTable(t, f, value, true); 1867 } 1868 1869 /** 1870 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1871 * @param t Table 1872 * @param f Array of Families to load 1873 * @param value the values of the cells. If null is passed, the row key is used as value 1874 * @return Count of rows loaded. 1875 */ 1876 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 1877 throws IOException { 1878 List<Put> puts = new ArrayList<>(); 1879 for (byte[] row : HBaseTestingUtil.ROWS) { 1880 Put put = new Put(row); 1881 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 1882 for (int i = 0; i < f.length; i++) { 1883 byte[] value1 = value != null ? value : row; 1884 put.addColumn(f[i], f[i], value1); 1885 } 1886 puts.add(put); 1887 } 1888 t.put(puts); 1889 return puts.size(); 1890 } 1891 1892 /** 1893 * A tracker for tracking and validating table rows generated with 1894 * {@link HBaseTestingUtil#loadTable(Table, byte[])} 1895 */ 1896 public static class SeenRowTracker { 1897 int dim = 'z' - 'a' + 1; 1898 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 1899 byte[] startRow; 1900 byte[] stopRow; 1901 1902 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 1903 this.startRow = startRow; 1904 this.stopRow = stopRow; 1905 } 1906 1907 void reset() { 1908 for (byte[] row : ROWS) { 1909 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 1910 } 1911 } 1912 1913 int i(byte b) { 1914 return b - 'a'; 1915 } 1916 1917 public void addRow(byte[] row) { 1918 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 1919 } 1920 1921 /** 1922 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 1923 * rows none 1924 */ 1925 public void validate() { 1926 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1927 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1928 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1929 int count = seenRows[i(b1)][i(b2)][i(b3)]; 1930 int expectedCount = 0; 1931 if ( 1932 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 1933 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 1934 ) { 1935 expectedCount = 1; 1936 } 1937 if (count != expectedCount) { 1938 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 1939 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 1940 + "instead of " + expectedCount); 1941 } 1942 } 1943 } 1944 } 1945 } 1946 } 1947 1948 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 1949 return loadRegion(r, f, false); 1950 } 1951 1952 public int loadRegion(final Region r, final byte[] f) throws IOException { 1953 return loadRegion((HRegion) r, f); 1954 } 1955 1956 /** 1957 * Load region with rows from 'aaa' to 'zzz'. 1958 * @param r Region 1959 * @param f Family 1960 * @param flush flush the cache if true 1961 * @return Count of rows loaded. 1962 */ 1963 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 1964 byte[] k = new byte[3]; 1965 int rowCount = 0; 1966 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1967 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1968 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1969 k[0] = b1; 1970 k[1] = b2; 1971 k[2] = b3; 1972 Put put = new Put(k); 1973 put.setDurability(Durability.SKIP_WAL); 1974 put.addColumn(f, null, k); 1975 if (r.getWAL() == null) { 1976 put.setDurability(Durability.SKIP_WAL); 1977 } 1978 int preRowCount = rowCount; 1979 int pause = 10; 1980 int maxPause = 1000; 1981 while (rowCount == preRowCount) { 1982 try { 1983 r.put(put); 1984 rowCount++; 1985 } catch (RegionTooBusyException e) { 1986 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 1987 Threads.sleep(pause); 1988 } 1989 } 1990 } 1991 } 1992 if (flush) { 1993 r.flush(true); 1994 } 1995 } 1996 return rowCount; 1997 } 1998 1999 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2000 throws IOException { 2001 for (int i = startRow; i < endRow; i++) { 2002 byte[] data = Bytes.toBytes(String.valueOf(i)); 2003 Put put = new Put(data); 2004 put.addColumn(f, null, data); 2005 t.put(put); 2006 } 2007 } 2008 2009 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 2010 throws IOException { 2011 for (int i = 0; i < totalRows; i++) { 2012 byte[] row = new byte[rowSize]; 2013 Bytes.random(row); 2014 Put put = new Put(row); 2015 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 2016 t.put(put); 2017 } 2018 } 2019 2020 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 2021 int replicaId) throws IOException { 2022 for (int i = startRow; i < endRow; i++) { 2023 String failMsg = "Failed verification of row :" + i; 2024 byte[] data = Bytes.toBytes(String.valueOf(i)); 2025 Get get = new Get(data); 2026 get.setReplicaId(replicaId); 2027 get.setConsistency(Consistency.TIMELINE); 2028 Result result = table.get(get); 2029 assertTrue(result.containsColumn(f, null), failMsg); 2030 assertEquals(1, result.getColumnCells(f, null).size(), failMsg); 2031 Cell cell = result.getColumnLatestCell(f, null); 2032 assertTrue(Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), 2033 cell.getValueLength()), failMsg); 2034 } 2035 } 2036 2037 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 2038 throws IOException { 2039 verifyNumericRows((HRegion) region, f, startRow, endRow); 2040 } 2041 2042 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 2043 throws IOException { 2044 verifyNumericRows(region, f, startRow, endRow, true); 2045 } 2046 2047 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 2048 final boolean present) throws IOException { 2049 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 2050 } 2051 2052 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 2053 final boolean present) throws IOException { 2054 for (int i = startRow; i < endRow; i++) { 2055 String failMsg = "Failed verification of row :" + i; 2056 byte[] data = Bytes.toBytes(String.valueOf(i)); 2057 Result result = region.get(new Get(data)); 2058 2059 boolean hasResult = result != null && !result.isEmpty(); 2060 assertEquals(present, hasResult, failMsg + result); 2061 if (!present) continue; 2062 2063 assertTrue(result.containsColumn(f, null), failMsg); 2064 assertEquals(1, result.getColumnCells(f, null).size(), failMsg); 2065 Cell cell = result.getColumnLatestCell(f, null); 2066 assertTrue(Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), 2067 cell.getValueLength()), failMsg); 2068 } 2069 } 2070 2071 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2072 throws IOException { 2073 for (int i = startRow; i < endRow; i++) { 2074 byte[] data = Bytes.toBytes(String.valueOf(i)); 2075 Delete delete = new Delete(data); 2076 delete.addFamily(f); 2077 t.delete(delete); 2078 } 2079 } 2080 2081 /** 2082 * Return the number of rows in the given table. 2083 * @param table to count rows 2084 * @return count of rows 2085 */ 2086 public static int countRows(final Table table) throws IOException { 2087 return countRows(table, new Scan()); 2088 } 2089 2090 public static int countRows(final Table table, final Scan scan) throws IOException { 2091 try (ResultScanner results = table.getScanner(scan)) { 2092 int count = 0; 2093 while (results.next() != null) { 2094 count++; 2095 } 2096 return count; 2097 } 2098 } 2099 2100 public static int countRows(final Table table, final byte[]... families) throws IOException { 2101 Scan scan = new Scan(); 2102 for (byte[] family : families) { 2103 scan.addFamily(family); 2104 } 2105 return countRows(table, scan); 2106 } 2107 2108 /** 2109 * Return the number of rows in the given table. 2110 */ 2111 public int countRows(final TableName tableName) throws IOException { 2112 try (Table table = getConnection().getTable(tableName)) { 2113 return countRows(table); 2114 } 2115 } 2116 2117 public static int countRows(final Region region) throws IOException { 2118 return countRows(region, new Scan()); 2119 } 2120 2121 public static int countRows(final Region region, final Scan scan) throws IOException { 2122 try (InternalScanner scanner = region.getScanner(scan)) { 2123 return countRows(scanner); 2124 } 2125 } 2126 2127 public static int countRows(final InternalScanner scanner) throws IOException { 2128 int scannedCount = 0; 2129 List<Cell> results = new ArrayList<>(); 2130 boolean hasMore = true; 2131 while (hasMore) { 2132 hasMore = scanner.next(results); 2133 scannedCount += results.size(); 2134 results.clear(); 2135 } 2136 return scannedCount; 2137 } 2138 2139 /** 2140 * Return an md5 digest of the entire contents of a table. 2141 */ 2142 public String checksumRows(final Table table) throws Exception { 2143 MessageDigest digest = MessageDigest.getInstance("MD5"); 2144 try (ResultScanner results = table.getScanner(new Scan())) { 2145 for (Result res : results) { 2146 digest.update(res.getRow()); 2147 } 2148 } 2149 return digest.toString(); 2150 } 2151 2152 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2153 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2154 static { 2155 int i = 0; 2156 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2157 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2158 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2159 ROWS[i][0] = b1; 2160 ROWS[i][1] = b2; 2161 ROWS[i][2] = b3; 2162 i++; 2163 } 2164 } 2165 } 2166 } 2167 2168 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2169 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2170 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2171 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2172 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2173 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2174 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2175 2176 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2177 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2178 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2179 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2180 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2181 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2182 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2183 2184 /** 2185 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2186 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2187 * @return list of region info for regions added to meta 2188 */ 2189 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2190 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2191 try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 2192 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2193 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2194 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2195 TableState.State.ENABLED); 2196 // add custom ones 2197 for (int i = 0; i < startKeys.length; i++) { 2198 int j = (i + 1) % startKeys.length; 2199 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2200 .setEndKey(startKeys[j]).build(); 2201 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2202 newRegions.add(hri); 2203 } 2204 return newRegions; 2205 } 2206 } 2207 2208 /** 2209 * Create an unmanaged WAL. Be sure to close it when you're through. 2210 */ 2211 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2212 throws IOException { 2213 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2214 // unless I pass along via the conf. 2215 Configuration confForWAL = new Configuration(conf); 2216 CommonFSUtils.setRootDir(confForWAL, rootDir); 2217 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.insecure().nextNumeric(8)) 2218 .getWAL(hri); 2219 } 2220 2221 /** 2222 * Create a region with it's own WAL. Be sure to call 2223 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2224 */ 2225 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2226 final Configuration conf, final TableDescriptor htd) throws IOException { 2227 return createRegionAndWAL(info, rootDir, conf, htd, true); 2228 } 2229 2230 /** 2231 * Create a region with it's own WAL. Be sure to call 2232 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2233 */ 2234 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2235 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2236 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2237 region.setBlockCache(blockCache); 2238 region.initialize(); 2239 return region; 2240 } 2241 2242 /** 2243 * Create a region with it's own WAL. Be sure to call 2244 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2245 */ 2246 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2247 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2248 throws IOException { 2249 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2250 region.setMobFileCache(mobFileCache); 2251 region.initialize(); 2252 return region; 2253 } 2254 2255 /** 2256 * Create a region with it's own WAL. Be sure to call 2257 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2258 */ 2259 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2260 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2261 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2262 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2263 WAL wal = createWal(conf, rootDir, info); 2264 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2265 } 2266 2267 /** 2268 * Find any other region server which is different from the one identified by parameter 2269 * @return another region server 2270 */ 2271 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2272 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2273 if (!(rst.getRegionServer() == rs)) { 2274 return rst.getRegionServer(); 2275 } 2276 } 2277 return null; 2278 } 2279 2280 /** 2281 * Tool to get the reference to the region server object that holds the region of the specified 2282 * user table. 2283 * @param tableName user table to lookup in hbase:meta 2284 * @return region server that holds it, null if the row doesn't exist 2285 */ 2286 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2287 throws IOException, InterruptedException { 2288 List<RegionInfo> regions = getAdmin().getRegions(tableName); 2289 if (regions == null || regions.isEmpty()) { 2290 return null; 2291 } 2292 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2293 2294 byte[] firstRegionName = 2295 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2296 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2297 2298 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2299 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2300 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2301 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2302 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2303 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2304 while (retrier.shouldRetry()) { 2305 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2306 if (index != -1) { 2307 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2308 } 2309 // Came back -1. Region may not be online yet. Sleep a while. 2310 retrier.sleepUntilNextRetry(); 2311 } 2312 return null; 2313 } 2314 2315 /** 2316 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2317 * MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start multiple 2318 * MiniMRCluster instances with different log dirs. 2319 * @throws IOException When starting the cluster fails. 2320 */ 2321 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2322 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2323 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2324 "99.0"); 2325 startMiniMapReduceCluster(2); 2326 return mrCluster; 2327 } 2328 2329 /** 2330 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2331 * filesystem. MiniMRCluster caches hadoop.log.dir when first started. It is not possible to start 2332 * multiple MiniMRCluster instances with different log dirs. 2333 * @param servers The number of <code>TaskTracker</code>'s to start. 2334 * @throws IOException When starting the cluster fails. 2335 */ 2336 private void startMiniMapReduceCluster(final int servers) throws IOException { 2337 if (mrCluster != null) { 2338 throw new IllegalStateException("MiniMRCluster is already running"); 2339 } 2340 LOG.info("Starting mini mapreduce cluster..."); 2341 setupClusterTestDir(); 2342 createDirsAndSetProperties(); 2343 2344 //// hadoop2 specific settings 2345 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2346 // we up the VM usable so that processes don't get killed. 2347 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2348 2349 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2350 // this avoids the problem by disabling speculative task execution in tests. 2351 conf.setBoolean("mapreduce.map.speculative", false); 2352 conf.setBoolean("mapreduce.reduce.speculative", false); 2353 //// 2354 2355 // Yarn container runs in independent JVM. We need to pass the argument manually here if the 2356 // JDK version >= 17. Otherwise, the MiniMRCluster will fail. 2357 if (JVM.getJVMSpecVersion() >= 17) { 2358 String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", ""); 2359 conf.set("yarn.app.mapreduce.am.command-opts", 2360 jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED"); 2361 } 2362 2363 // Disable fs cache for DistributedFileSystem, to avoid YARN RM close the shared FileSystem 2364 // instance and cause trouble in HBase. See HBASE-29802 for more details. 2365 JobConf mrClusterConf = new JobConf(conf); 2366 mrClusterConf.setBoolean("fs.hdfs.impl.disable.cache", true); 2367 // Allow the user to override FS URI for this map-reduce cluster to use. 2368 mrCluster = 2369 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2370 1, null, null, mrClusterConf); 2371 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2372 if (jobConf == null) { 2373 jobConf = mrCluster.createJobConf(); 2374 } 2375 2376 // Hadoop MiniMR overwrites this while it should not 2377 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2378 LOG.info("Mini mapreduce cluster started"); 2379 2380 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2381 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2382 // necessary config properties here. YARN-129 required adding a few properties. 2383 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2384 // this for mrv2 support; mr1 ignores this 2385 conf.set("mapreduce.framework.name", "yarn"); 2386 conf.setBoolean("yarn.is.minicluster", true); 2387 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2388 if (rmAddress != null) { 2389 conf.set("yarn.resourcemanager.address", rmAddress); 2390 } 2391 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2392 if (historyAddress != null) { 2393 conf.set("mapreduce.jobhistory.address", historyAddress); 2394 } 2395 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2396 if (schedulerAddress != null) { 2397 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2398 } 2399 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2400 if (mrJobHistoryWebappAddress != null) { 2401 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2402 } 2403 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2404 if (yarnRMWebappAddress != null) { 2405 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2406 } 2407 } 2408 2409 /** 2410 * Stops the previously started <code>MiniMRCluster</code>. 2411 */ 2412 public void shutdownMiniMapReduceCluster() { 2413 if (mrCluster != null) { 2414 LOG.info("Stopping mini mapreduce cluster..."); 2415 mrCluster.shutdown(); 2416 mrCluster = null; 2417 LOG.info("Mini mapreduce cluster stopped"); 2418 } 2419 // Restore configuration to point to local jobtracker 2420 conf.set("mapreduce.jobtracker.address", "local"); 2421 } 2422 2423 /** 2424 * Create a stubbed out RegionServerService, mainly for getting FS. 2425 */ 2426 public RegionServerServices createMockRegionServerService() throws IOException { 2427 return createMockRegionServerService((ServerName) null); 2428 } 2429 2430 /** 2431 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2432 * TestTokenAuthentication 2433 */ 2434 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2435 throws IOException { 2436 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2437 rss.setFileSystem(getTestFileSystem()); 2438 rss.setRpcServer(rpc); 2439 return rss; 2440 } 2441 2442 /** 2443 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2444 * TestOpenRegionHandler 2445 */ 2446 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2447 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2448 rss.setFileSystem(getTestFileSystem()); 2449 return rss; 2450 } 2451 2452 /** 2453 * Expire the Master's session 2454 */ 2455 public void expireMasterSession() throws Exception { 2456 HMaster master = getMiniHBaseCluster().getMaster(); 2457 expireSession(master.getZooKeeper(), false); 2458 } 2459 2460 /** 2461 * Expire a region server's session 2462 * @param index which RS 2463 */ 2464 public void expireRegionServerSession(int index) throws Exception { 2465 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2466 expireSession(rs.getZooKeeper(), false); 2467 decrementMinRegionServerCount(); 2468 } 2469 2470 private void decrementMinRegionServerCount() { 2471 // decrement the count for this.conf, for newly spwaned master 2472 // this.hbaseCluster shares this configuration too 2473 decrementMinRegionServerCount(getConfiguration()); 2474 2475 // each master thread keeps a copy of configuration 2476 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2477 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2478 } 2479 } 2480 2481 private void decrementMinRegionServerCount(Configuration conf) { 2482 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2483 if (currentCount != -1) { 2484 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2485 } 2486 } 2487 2488 public void expireSession(ZKWatcher nodeZK) throws Exception { 2489 expireSession(nodeZK, false); 2490 } 2491 2492 /** 2493 * Expire a ZooKeeper session as recommended in ZooKeeper documentation <a href= 2494 * "https://hbase.apache.org/docs/troubleshooting#troubleshooting-zookeeper">Troubleshooting 2495 * ZooKeeper</a> 2496 * <p/> 2497 * There are issues when doing this: 2498 * <ol> 2499 * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li> 2500 * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li> 2501 * </ol> 2502 * @param nodeZK - the ZK watcher to expire 2503 * @param checkStatus - true to check if we can create a Table with the current configuration. 2504 */ 2505 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2506 Configuration c = new Configuration(this.conf); 2507 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2508 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2509 byte[] password = zk.getSessionPasswd(); 2510 long sessionID = zk.getSessionId(); 2511 2512 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2513 // so we create a first watcher to be sure that the 2514 // event was sent. We expect that if our watcher receives the event 2515 // other watchers on the same machine will get is as well. 2516 // When we ask to close the connection, ZK does not close it before 2517 // we receive all the events, so don't have to capture the event, just 2518 // closing the connection should be enough. 2519 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2520 @Override 2521 public void process(WatchedEvent watchedEvent) { 2522 LOG.info("Monitor ZKW received event=" + watchedEvent); 2523 } 2524 }, sessionID, password); 2525 2526 // Making it expire 2527 ZooKeeper newZK = 2528 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2529 2530 // ensure that we have connection to the server before closing down, otherwise 2531 // the close session event will be eaten out before we start CONNECTING state 2532 long start = EnvironmentEdgeManager.currentTime(); 2533 while ( 2534 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2535 ) { 2536 Thread.sleep(1); 2537 } 2538 newZK.close(); 2539 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2540 2541 // Now closing & waiting to be sure that the clients get it. 2542 monitor.close(); 2543 2544 if (checkStatus) { 2545 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2546 } 2547 } 2548 2549 /** 2550 * Get the Mini HBase cluster. 2551 * @return hbase cluster 2552 * @see #getHBaseClusterInterface() 2553 */ 2554 public SingleProcessHBaseCluster getHBaseCluster() { 2555 return getMiniHBaseCluster(); 2556 } 2557 2558 /** 2559 * Returns the HBaseCluster instance. 2560 * <p> 2561 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2562 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2563 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2564 * instead w/o the need to type-cast. 2565 */ 2566 public HBaseClusterInterface getHBaseClusterInterface() { 2567 // implementation note: we should rename this method as #getHBaseCluster(), 2568 // but this would require refactoring 90+ calls. 2569 return hbaseCluster; 2570 } 2571 2572 /** 2573 * Resets the connections so that the next time getConnection() is called, a new connection is 2574 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2575 * the connection is not valid anymore. 2576 * <p/> 2577 * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are 2578 * written, not all start() stop() calls go through this class. Most tests directly operate on the 2579 * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain 2580 * the connection state automatically. Cleaning this is a much bigger refactor. 2581 */ 2582 public void invalidateConnection() throws IOException { 2583 closeConnection(); 2584 // Update the master addresses if they changed. 2585 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2586 final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY); 2587 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2588 masterConfigBefore, masterConfAfter); 2589 conf.set(HConstants.MASTER_ADDRS_KEY, 2590 getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY)); 2591 } 2592 2593 /** 2594 * Get a shared Connection to the cluster. this method is thread safe. 2595 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2596 */ 2597 public Connection getConnection() throws IOException { 2598 return getAsyncConnection().toConnection(); 2599 } 2600 2601 /** 2602 * Get a assigned Connection to the cluster. this method is thread safe. 2603 * @param user assigned user 2604 * @return A Connection with assigned user. 2605 */ 2606 public Connection getConnection(User user) throws IOException { 2607 return getAsyncConnection(user).toConnection(); 2608 } 2609 2610 /** 2611 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2612 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2613 * of cluster. 2614 */ 2615 public AsyncClusterConnection getAsyncConnection() throws IOException { 2616 try { 2617 return asyncConnection.updateAndGet(connection -> { 2618 if (connection == null) { 2619 try { 2620 User user = UserProvider.instantiate(conf).getCurrent(); 2621 connection = getAsyncConnection(user); 2622 } catch (IOException ioe) { 2623 throw new UncheckedIOException("Failed to create connection", ioe); 2624 } 2625 } 2626 return connection; 2627 }); 2628 } catch (UncheckedIOException exception) { 2629 throw exception.getCause(); 2630 } 2631 } 2632 2633 /** 2634 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2635 * @param user assigned user 2636 * @return An AsyncClusterConnection with assigned user. 2637 */ 2638 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2639 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2640 } 2641 2642 public void closeConnection() throws IOException { 2643 if (hbaseAdmin != null) { 2644 Closeables.close(hbaseAdmin, true); 2645 hbaseAdmin = null; 2646 } 2647 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2648 if (asyncConnection != null) { 2649 Closeables.close(asyncConnection, true); 2650 } 2651 } 2652 2653 /** 2654 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2655 * it has no effect, it will be closed automatically when the cluster shutdowns 2656 */ 2657 public Admin getAdmin() throws IOException { 2658 if (hbaseAdmin == null) { 2659 this.hbaseAdmin = getConnection().getAdmin(); 2660 } 2661 return hbaseAdmin; 2662 } 2663 2664 private Admin hbaseAdmin = null; 2665 2666 /** 2667 * Returns an {@link Hbck} instance. Needs be closed when done. 2668 */ 2669 public Hbck getHbck() throws IOException { 2670 return getConnection().getHbck(); 2671 } 2672 2673 /** 2674 * Unassign the named region. 2675 * @param regionName The region to unassign. 2676 */ 2677 public void unassignRegion(String regionName) throws IOException { 2678 unassignRegion(Bytes.toBytes(regionName)); 2679 } 2680 2681 /** 2682 * Unassign the named region. 2683 * @param regionName The region to unassign. 2684 */ 2685 public void unassignRegion(byte[] regionName) throws IOException { 2686 getAdmin().unassign(regionName); 2687 } 2688 2689 /** 2690 * Closes the region containing the given row. 2691 * @param row The row to find the containing region. 2692 * @param table The table to find the region. 2693 */ 2694 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 2695 unassignRegionByRow(Bytes.toBytes(row), table); 2696 } 2697 2698 /** 2699 * Closes the region containing the given row. 2700 * @param row The row to find the containing region. 2701 * @param table The table to find the region. 2702 */ 2703 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 2704 HRegionLocation hrl = table.getRegionLocation(row); 2705 unassignRegion(hrl.getRegion().getRegionName()); 2706 } 2707 2708 /** 2709 * Retrieves a splittable region randomly from tableName 2710 * @param tableName name of table 2711 * @param maxAttempts maximum number of attempts, unlimited for value of -1 2712 * @return the HRegion chosen, null if none was found within limit of maxAttempts 2713 */ 2714 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 2715 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 2716 int regCount = regions.size(); 2717 Set<Integer> attempted = new HashSet<>(); 2718 int idx; 2719 int attempts = 0; 2720 do { 2721 regions = getHBaseCluster().getRegions(tableName); 2722 if (regCount != regions.size()) { 2723 // if there was region movement, clear attempted Set 2724 attempted.clear(); 2725 } 2726 regCount = regions.size(); 2727 // There are chances that before we get the region for the table from an RS the region may 2728 // be going for CLOSE. This may be because online schema change is enabled 2729 if (regCount > 0) { 2730 idx = ThreadLocalRandom.current().nextInt(regCount); 2731 // if we have just tried this region, there is no need to try again 2732 if (attempted.contains(idx)) { 2733 continue; 2734 } 2735 HRegion region = regions.get(idx); 2736 if (region.checkSplit().isPresent()) { 2737 return region; 2738 } 2739 attempted.add(idx); 2740 } 2741 attempts++; 2742 } while (maxAttempts == -1 || attempts < maxAttempts); 2743 return null; 2744 } 2745 2746 public MiniDFSCluster getDFSCluster() { 2747 return dfsCluster; 2748 } 2749 2750 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 2751 setDFSCluster(cluster, true); 2752 } 2753 2754 /** 2755 * Set the MiniDFSCluster 2756 * @param cluster cluster to use 2757 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 2758 * is set. 2759 * @throws IllegalStateException if the passed cluster is up when it is required to be down 2760 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 2761 */ 2762 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 2763 throws IllegalStateException, IOException { 2764 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 2765 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 2766 } 2767 this.dfsCluster = cluster; 2768 this.setFs(); 2769 } 2770 2771 public FileSystem getTestFileSystem() throws IOException { 2772 return HFileSystem.get(conf); 2773 } 2774 2775 /** 2776 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 2777 * (30 seconds). 2778 * @param table Table to wait on. 2779 */ 2780 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 2781 waitTableAvailable(table.getName(), 30000); 2782 } 2783 2784 public void waitTableAvailable(TableName table, long timeoutMillis) 2785 throws InterruptedException, IOException { 2786 waitFor(timeoutMillis, predicateTableAvailable(table)); 2787 } 2788 2789 /** 2790 * Wait until all regions in a table have been assigned 2791 * @param table Table to wait on. 2792 * @param timeoutMillis Timeout. 2793 */ 2794 public void waitTableAvailable(byte[] table, long timeoutMillis) 2795 throws InterruptedException, IOException { 2796 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 2797 } 2798 2799 public String explainTableAvailability(TableName tableName) throws IOException { 2800 StringBuilder msg = 2801 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 2802 if (getHBaseCluster().getMaster().isAlive()) { 2803 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 2804 .getRegionStates().getRegionAssignments(); 2805 final List<Pair<RegionInfo, ServerName>> metaLocations = 2806 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 2807 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 2808 RegionInfo hri = metaLocation.getFirst(); 2809 ServerName sn = metaLocation.getSecond(); 2810 if (!assignments.containsKey(hri)) { 2811 msg.append(", region ").append(hri) 2812 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 2813 } else if (sn == null) { 2814 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 2815 } else if (!sn.equals(assignments.get(hri))) { 2816 msg.append(", region ").append(hri) 2817 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 2818 .append(" <> ").append(assignments.get(hri)); 2819 } 2820 } 2821 } 2822 return msg.toString(); 2823 } 2824 2825 public String explainTableState(final TableName table, TableState.State state) 2826 throws IOException { 2827 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 2828 if (tableState == null) { 2829 return "TableState in META: No table state in META for table " + table 2830 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 2831 } else if (!tableState.inStates(state)) { 2832 return "TableState in META: Not " + state + " state, but " + tableState; 2833 } else { 2834 return "TableState in META: OK"; 2835 } 2836 } 2837 2838 @Nullable 2839 public TableState findLastTableState(final TableName table) throws IOException { 2840 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 2841 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 2842 @Override 2843 public boolean visit(Result r) throws IOException { 2844 if (!Arrays.equals(r.getRow(), table.getName())) { 2845 return false; 2846 } 2847 TableState state = CatalogFamilyFormat.getTableState(r); 2848 if (state != null) { 2849 lastTableState.set(state); 2850 } 2851 return true; 2852 } 2853 }; 2854 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 2855 Integer.MAX_VALUE, visitor); 2856 return lastTableState.get(); 2857 } 2858 2859 /** 2860 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2861 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 2862 * table. 2863 * @param table the table to wait on. 2864 * @throws InterruptedException if interrupted while waiting 2865 * @throws IOException if an IO problem is encountered 2866 */ 2867 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 2868 waitTableEnabled(table, 30000); 2869 } 2870 2871 /** 2872 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2873 * have been all assigned. 2874 * @see #waitTableEnabled(TableName, long) 2875 * @param table Table to wait on. 2876 * @param timeoutMillis Time to wait on it being marked enabled. 2877 */ 2878 public void waitTableEnabled(byte[] table, long timeoutMillis) 2879 throws InterruptedException, IOException { 2880 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 2881 } 2882 2883 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 2884 waitFor(timeoutMillis, predicateTableEnabled(table)); 2885 } 2886 2887 /** 2888 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 2889 * after default period (30 seconds) 2890 * @param table Table to wait on. 2891 */ 2892 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 2893 waitTableDisabled(table, 30000); 2894 } 2895 2896 public void waitTableDisabled(TableName table, long millisTimeout) 2897 throws InterruptedException, IOException { 2898 waitFor(millisTimeout, predicateTableDisabled(table)); 2899 } 2900 2901 /** 2902 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 2903 * @param table Table to wait on. 2904 * @param timeoutMillis Time to wait on it being marked disabled. 2905 */ 2906 public void waitTableDisabled(byte[] table, long timeoutMillis) 2907 throws InterruptedException, IOException { 2908 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 2909 } 2910 2911 /** 2912 * Make sure that at least the specified number of region servers are running 2913 * @param num minimum number of region servers that should be running 2914 * @return true if we started some servers 2915 */ 2916 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 2917 boolean startedServer = false; 2918 SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster(); 2919 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 2920 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 2921 startedServer = true; 2922 } 2923 2924 return startedServer; 2925 } 2926 2927 /** 2928 * Make sure that at least the specified number of region servers are running. We don't count the 2929 * ones that are currently stopping or are stopped. 2930 * @param num minimum number of region servers that should be running 2931 * @return true if we started some servers 2932 */ 2933 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 2934 boolean startedServer = ensureSomeRegionServersAvailable(num); 2935 2936 int nonStoppedServers = 0; 2937 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2938 2939 HRegionServer hrs = rst.getRegionServer(); 2940 if (hrs.isStopping() || hrs.isStopped()) { 2941 LOG.info("A region server is stopped or stopping:" + hrs); 2942 } else { 2943 nonStoppedServers++; 2944 } 2945 } 2946 for (int i = nonStoppedServers; i < num; ++i) { 2947 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 2948 startedServer = true; 2949 } 2950 return startedServer; 2951 } 2952 2953 /** 2954 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 2955 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 2956 * @param c Initial configuration 2957 * @param differentiatingSuffix Suffix to differentiate this user from others. 2958 * @return A new configuration instance with a different user set into it. 2959 */ 2960 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 2961 throws IOException { 2962 FileSystem currentfs = FileSystem.get(c); 2963 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 2964 return User.getCurrent(); 2965 } 2966 // Else distributed filesystem. Make a new instance per daemon. Below 2967 // code is taken from the AppendTestUtil over in hdfs. 2968 String username = User.getCurrent().getName() + differentiatingSuffix; 2969 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 2970 return user; 2971 } 2972 2973 public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster) 2974 throws IOException { 2975 NavigableSet<String> online = new TreeSet<>(); 2976 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 2977 try { 2978 for (RegionInfo region : ProtobufUtil 2979 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 2980 online.add(region.getRegionNameAsString()); 2981 } 2982 } catch (RegionServerStoppedException e) { 2983 // That's fine. 2984 } 2985 } 2986 return online; 2987 } 2988 2989 /** 2990 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 2991 * linger. Here is the exception you'll see: 2992 * 2993 * <pre> 2994 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 2995 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 2996 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 2997 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 2998 * </pre> 2999 * 3000 * @param stream A DFSClient.DFSOutputStream. 3001 */ 3002 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 3003 try { 3004 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 3005 for (Class<?> clazz : clazzes) { 3006 String className = clazz.getSimpleName(); 3007 if (className.equals("DFSOutputStream")) { 3008 if (clazz.isInstance(stream)) { 3009 Field maxRecoveryErrorCountField = 3010 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 3011 maxRecoveryErrorCountField.setAccessible(true); 3012 maxRecoveryErrorCountField.setInt(stream, max); 3013 break; 3014 } 3015 } 3016 } 3017 } catch (Exception e) { 3018 LOG.info("Could not set max recovery field", e); 3019 } 3020 } 3021 3022 /** 3023 * Uses directly the assignment manager to assign the region. and waits until the specified region 3024 * has completed assignment. 3025 * @return true if the region is assigned false otherwise. 3026 */ 3027 public boolean assignRegion(final RegionInfo regionInfo) 3028 throws IOException, InterruptedException { 3029 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 3030 am.assign(regionInfo); 3031 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 3032 } 3033 3034 /** 3035 * Move region to destination server and wait till region is completely moved and online 3036 * @param destRegion region to move 3037 * @param destServer destination server of the region 3038 */ 3039 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3040 throws InterruptedException, IOException { 3041 HMaster master = getMiniHBaseCluster().getMaster(); 3042 // TODO: Here we start the move. The move can take a while. 3043 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3044 while (true) { 3045 ServerName serverName = 3046 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3047 if (serverName != null && serverName.equals(destServer)) { 3048 assertRegionOnServer(destRegion, serverName, 2000); 3049 break; 3050 } 3051 Thread.sleep(10); 3052 } 3053 } 3054 3055 /** 3056 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3057 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3058 * master has been informed and updated hbase:meta with the regions deployed server. 3059 * @param tableName the table name 3060 */ 3061 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3062 waitUntilAllRegionsAssigned(tableName, 3063 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3064 } 3065 3066 /** 3067 * Waith until all system table's regions get assigned 3068 */ 3069 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3070 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3071 } 3072 3073 /** 3074 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3075 * timeout. This means all regions have been deployed, master has been informed and updated 3076 * hbase:meta with the regions deployed server. 3077 * @param tableName the table name 3078 * @param timeout timeout, in milliseconds 3079 */ 3080 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3081 throws IOException { 3082 if (!TableName.isMetaTableName(tableName)) { 3083 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3084 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3085 + timeout + "ms"); 3086 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3087 @Override 3088 public String explainFailure() throws IOException { 3089 return explainTableAvailability(tableName); 3090 } 3091 3092 @Override 3093 public boolean evaluate() throws IOException { 3094 Scan scan = new Scan(); 3095 scan.addFamily(HConstants.CATALOG_FAMILY); 3096 boolean tableFound = false; 3097 try (ResultScanner s = meta.getScanner(scan)) { 3098 for (Result r; (r = s.next()) != null;) { 3099 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3100 RegionInfo info = RegionInfo.parseFromOrNull(b); 3101 if (info != null && info.getTable().equals(tableName)) { 3102 // Get server hosting this region from catalog family. Return false if no server 3103 // hosting this region, or if the server hosting this region was recently killed 3104 // (for fault tolerance testing). 3105 tableFound = true; 3106 byte[] server = 3107 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3108 if (server == null) { 3109 return false; 3110 } else { 3111 byte[] startCode = 3112 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3113 ServerName serverName = 3114 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3115 + Bytes.toLong(startCode)); 3116 if ( 3117 !getHBaseClusterInterface().isDistributedCluster() 3118 && getHBaseCluster().isKilledRS(serverName) 3119 ) { 3120 return false; 3121 } 3122 } 3123 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3124 return false; 3125 } 3126 } 3127 } 3128 } 3129 if (!tableFound) { 3130 LOG.warn( 3131 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3132 } 3133 return tableFound; 3134 } 3135 }); 3136 } 3137 } 3138 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3139 // check from the master state if we are using a mini cluster 3140 if (!getHBaseClusterInterface().isDistributedCluster()) { 3141 // So, all regions are in the meta table but make sure master knows of the assignments before 3142 // returning -- sometimes this can lag. 3143 HMaster master = getHBaseCluster().getMaster(); 3144 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3145 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3146 @Override 3147 public String explainFailure() throws IOException { 3148 return explainTableAvailability(tableName); 3149 } 3150 3151 @Override 3152 public boolean evaluate() throws IOException { 3153 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3154 return hris != null && !hris.isEmpty(); 3155 } 3156 }); 3157 } 3158 LOG.info("All regions for table " + tableName + " assigned."); 3159 } 3160 3161 /** 3162 * Do a small get/scan against one store. This is required because store has no actual methods of 3163 * querying itself, and relies on StoreScanner. 3164 */ 3165 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3166 Scan scan = new Scan(get); 3167 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3168 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3169 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3170 // readpoint 0. 3171 0); 3172 3173 List<Cell> result = new ArrayList<>(); 3174 scanner.next(result); 3175 if (!result.isEmpty()) { 3176 // verify that we are on the row we want: 3177 Cell kv = result.get(0); 3178 if (!CellUtil.matchingRows(kv, get.getRow())) { 3179 result.clear(); 3180 } 3181 } 3182 scanner.close(); 3183 return result; 3184 } 3185 3186 /** 3187 * Create region split keys between startkey and endKey 3188 * @param numRegions the number of regions to be created. it has to be greater than 3. 3189 * @return resulting split keys 3190 */ 3191 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3192 assertTrue(numRegions > 3); 3193 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3194 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3195 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3196 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3197 return result; 3198 } 3199 3200 /** 3201 * Do a small get/scan against one store. This is required because store has no actual methods of 3202 * querying itself, and relies on StoreScanner. 3203 */ 3204 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3205 throws IOException { 3206 Get get = new Get(row); 3207 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3208 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3209 3210 return getFromStoreFile(store, get); 3211 } 3212 3213 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3214 final List<? extends Cell> actual) { 3215 final int eLen = expected.size(); 3216 final int aLen = actual.size(); 3217 final int minLen = Math.min(eLen, aLen); 3218 3219 int i = 0; 3220 while ( 3221 i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0 3222 ) { 3223 i++; 3224 } 3225 3226 if (additionalMsg == null) { 3227 additionalMsg = ""; 3228 } 3229 if (!additionalMsg.isEmpty()) { 3230 additionalMsg = ". " + additionalMsg; 3231 } 3232 3233 if (eLen != aLen || i != minLen) { 3234 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3235 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3236 + " (length " + aLen + ")" + additionalMsg); 3237 } 3238 } 3239 3240 public static <T> String safeGetAsStr(List<T> lst, int i) { 3241 if (0 <= i && i < lst.size()) { 3242 return lst.get(i).toString(); 3243 } else { 3244 return "<out_of_range>"; 3245 } 3246 } 3247 3248 public String getRpcConnnectionURI() throws UnknownHostException { 3249 return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); 3250 } 3251 3252 public String getZkConnectionURI() { 3253 return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 3254 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3255 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3256 } 3257 3258 /** 3259 * Get the zk based cluster key for this cluster. 3260 * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the 3261 * connection info of a cluster. Keep here only for compatibility. 3262 * @see #getRpcConnnectionURI() 3263 * @see #getZkConnectionURI() 3264 */ 3265 @Deprecated 3266 public String getClusterKey() { 3267 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3268 + ":" 3269 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3270 } 3271 3272 /** 3273 * Creates a random table with the given parameters 3274 */ 3275 public Table createRandomTable(TableName tableName, final Collection<String> families, 3276 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3277 final int numRowsPerFlush) throws IOException, InterruptedException { 3278 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3279 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3280 + maxVersions + "\n"); 3281 3282 final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L); 3283 final int numCF = families.size(); 3284 final byte[][] cfBytes = new byte[numCF][]; 3285 { 3286 int cfIndex = 0; 3287 for (String cf : families) { 3288 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3289 } 3290 } 3291 3292 final int actualStartKey = 0; 3293 final int actualEndKey = Integer.MAX_VALUE; 3294 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3295 final int splitStartKey = actualStartKey + keysPerRegion; 3296 final int splitEndKey = actualEndKey - keysPerRegion; 3297 final String keyFormat = "%08x"; 3298 final Table table = createTable(tableName, cfBytes, maxVersions, 3299 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3300 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3301 3302 if (hbaseCluster != null) { 3303 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3304 } 3305 3306 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3307 3308 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3309 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3310 final byte[] row = Bytes.toBytes( 3311 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3312 3313 Put put = new Put(row); 3314 Delete del = new Delete(row); 3315 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3316 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3317 final long ts = rand.nextInt(); 3318 final byte[] qual = Bytes.toBytes("col" + iCol); 3319 if (rand.nextBoolean()) { 3320 final byte[] value = 3321 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3322 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3323 put.addColumn(cf, qual, ts, value); 3324 } else if (rand.nextDouble() < 0.8) { 3325 del.addColumn(cf, qual, ts); 3326 } else { 3327 del.addColumns(cf, qual, ts); 3328 } 3329 } 3330 3331 if (!put.isEmpty()) { 3332 mutator.mutate(put); 3333 } 3334 3335 if (!del.isEmpty()) { 3336 mutator.mutate(del); 3337 } 3338 } 3339 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3340 mutator.flush(); 3341 if (hbaseCluster != null) { 3342 getMiniHBaseCluster().flushcache(table.getName()); 3343 } 3344 } 3345 mutator.close(); 3346 3347 return table; 3348 } 3349 3350 public static int randomFreePort() { 3351 return HBaseCommonTestingUtil.randomFreePort(); 3352 } 3353 3354 public static String randomMultiCastAddress() { 3355 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3356 } 3357 3358 public static void waitForHostPort(String host, int port) throws IOException { 3359 final int maxTimeMs = 10000; 3360 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3361 IOException savedException = null; 3362 LOG.info("Waiting for server at " + host + ":" + port); 3363 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3364 try { 3365 Socket sock = new Socket(InetAddress.getByName(host), port); 3366 sock.close(); 3367 savedException = null; 3368 LOG.info("Server at " + host + ":" + port + " is available"); 3369 break; 3370 } catch (UnknownHostException e) { 3371 throw new IOException("Failed to look up " + host, e); 3372 } catch (IOException e) { 3373 savedException = e; 3374 } 3375 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3376 } 3377 3378 if (savedException != null) { 3379 throw savedException; 3380 } 3381 } 3382 3383 public static int getMetaRSPort(Connection connection) throws IOException { 3384 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3385 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3386 } 3387 } 3388 3389 /** 3390 * Due to async racing issue, a region may not be in the online region list of a region server 3391 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3392 */ 3393 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3394 final long timeout) throws IOException, InterruptedException { 3395 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3396 while (true) { 3397 List<RegionInfo> regions = getAdmin().getRegions(server); 3398 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3399 long now = EnvironmentEdgeManager.currentTime(); 3400 if (now > timeoutTime) break; 3401 Thread.sleep(10); 3402 } 3403 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3404 } 3405 3406 /** 3407 * Check to make sure the region is open on the specified region server, but not on any other one. 3408 */ 3409 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3410 final long timeout) throws IOException, InterruptedException { 3411 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3412 while (true) { 3413 List<RegionInfo> regions = getAdmin().getRegions(server); 3414 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3415 List<JVMClusterUtil.RegionServerThread> rsThreads = 3416 getHBaseCluster().getLiveRegionServerThreads(); 3417 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3418 HRegionServer rs = rsThread.getRegionServer(); 3419 if (server.equals(rs.getServerName())) { 3420 continue; 3421 } 3422 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3423 for (HRegion r : hrs) { 3424 assertTrue(r.getRegionInfo().getRegionId() != hri.getRegionId(), 3425 "Region should not be double assigned"); 3426 } 3427 } 3428 return; // good, we are happy 3429 } 3430 long now = EnvironmentEdgeManager.currentTime(); 3431 if (now > timeoutTime) break; 3432 Thread.sleep(10); 3433 } 3434 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3435 } 3436 3437 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3438 TableDescriptor td = 3439 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3440 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3441 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3442 } 3443 3444 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3445 BlockCache blockCache) throws IOException { 3446 TableDescriptor td = 3447 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3448 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3449 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3450 } 3451 3452 public static void setFileSystemURI(String fsURI) { 3453 FS_URI = fsURI; 3454 } 3455 3456 /** 3457 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3458 */ 3459 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3460 return new ExplainingPredicate<IOException>() { 3461 @Override 3462 public String explainFailure() throws IOException { 3463 final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager(); 3464 return "found in transition: " + am.getRegionsInTransition().toString(); 3465 } 3466 3467 @Override 3468 public boolean evaluate() throws IOException { 3469 HMaster master = getMiniHBaseCluster().getMaster(); 3470 if (master == null) return false; 3471 AssignmentManager am = master.getAssignmentManager(); 3472 if (am == null) return false; 3473 return !am.hasRegionsInTransition(); 3474 } 3475 }; 3476 } 3477 3478 /** 3479 * Returns a {@link Predicate} for checking that there are no procedure to region transition in 3480 * master 3481 */ 3482 public ExplainingPredicate<IOException> predicateNoRegionTransitScheduled() { 3483 return new ExplainingPredicate<IOException>() { 3484 @Override 3485 public String explainFailure() throws IOException { 3486 final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager(); 3487 return "Number of procedure scheduled for region transit: " 3488 + am.getRegionTransitScheduledCount(); 3489 } 3490 3491 @Override 3492 public boolean evaluate() throws IOException { 3493 HMaster master = getMiniHBaseCluster().getMaster(); 3494 if (master == null) { 3495 return false; 3496 } 3497 AssignmentManager am = master.getAssignmentManager(); 3498 if (am == null) { 3499 return false; 3500 } 3501 return am.getRegionTransitScheduledCount() == 0; 3502 } 3503 }; 3504 } 3505 3506 /** 3507 * Returns a {@link Predicate} for checking that table is enabled 3508 */ 3509 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3510 return new ExplainingPredicate<IOException>() { 3511 @Override 3512 public String explainFailure() throws IOException { 3513 return explainTableState(tableName, TableState.State.ENABLED); 3514 } 3515 3516 @Override 3517 public boolean evaluate() throws IOException { 3518 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3519 } 3520 }; 3521 } 3522 3523 /** 3524 * Returns a {@link Predicate} for checking that table is enabled 3525 */ 3526 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3527 return new ExplainingPredicate<IOException>() { 3528 @Override 3529 public String explainFailure() throws IOException { 3530 return explainTableState(tableName, TableState.State.DISABLED); 3531 } 3532 3533 @Override 3534 public boolean evaluate() throws IOException { 3535 return getAdmin().isTableDisabled(tableName); 3536 } 3537 }; 3538 } 3539 3540 /** 3541 * Returns a {@link Predicate} for checking that table is enabled 3542 */ 3543 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3544 return new ExplainingPredicate<IOException>() { 3545 @Override 3546 public String explainFailure() throws IOException { 3547 return explainTableAvailability(tableName); 3548 } 3549 3550 @Override 3551 public boolean evaluate() throws IOException { 3552 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3553 if (tableAvailable) { 3554 try (Table table = getConnection().getTable(tableName)) { 3555 TableDescriptor htd = table.getDescriptor(); 3556 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3557 .getAllRegionLocations()) { 3558 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3559 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3560 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3561 for (byte[] family : htd.getColumnFamilyNames()) { 3562 scan.addFamily(family); 3563 } 3564 try (ResultScanner scanner = table.getScanner(scan)) { 3565 scanner.next(); 3566 } 3567 } 3568 } 3569 } 3570 return tableAvailable; 3571 } 3572 }; 3573 } 3574 3575 /** 3576 * Wait until no regions in transition. 3577 * @param timeout How long to wait. 3578 */ 3579 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3580 waitFor(timeout, predicateNoRegionsInTransition()); 3581 } 3582 3583 /** 3584 * Wait until no regions in transition. 3585 * @param timeout How long to wait. 3586 */ 3587 public void waitUntilNoRegionTransitScheduled(final long timeout) throws IOException { 3588 waitFor(timeout, predicateNoRegionTransitScheduled()); 3589 } 3590 3591 /** 3592 * Wait until no regions in transition. (time limit 15min) 3593 */ 3594 public void waitUntilNoRegionsInTransition() throws IOException { 3595 waitUntilNoRegionsInTransition(15 * 60000); 3596 } 3597 3598 /** 3599 * Wait until no TRSP is present 3600 */ 3601 public void waitUntilNoRegionTransitScheduled() throws IOException { 3602 waitUntilNoRegionTransitScheduled(15 * 60000); 3603 } 3604 3605 /** 3606 * Wait until labels is ready in VisibilityLabelsCache. 3607 */ 3608 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 3609 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 3610 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 3611 3612 @Override 3613 public boolean evaluate() { 3614 for (String label : labels) { 3615 if (labelsCache.getLabelOrdinal(label) == 0) { 3616 return false; 3617 } 3618 } 3619 return true; 3620 } 3621 3622 @Override 3623 public String explainFailure() { 3624 for (String label : labels) { 3625 if (labelsCache.getLabelOrdinal(label) == 0) { 3626 return label + " is not available yet"; 3627 } 3628 } 3629 return ""; 3630 } 3631 }); 3632 } 3633 3634 /** 3635 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3636 * available. 3637 * @return the list of column descriptors 3638 */ 3639 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 3640 return generateColumnDescriptors(""); 3641 } 3642 3643 /** 3644 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3645 * available. 3646 * @param prefix family names prefix 3647 * @return the list of column descriptors 3648 */ 3649 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 3650 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 3651 long familyId = 0; 3652 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 3653 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 3654 for (BloomType bloomType : BloomType.values()) { 3655 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 3656 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 3657 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 3658 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 3659 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 3660 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 3661 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 3662 familyId++; 3663 } 3664 } 3665 } 3666 return columnFamilyDescriptors; 3667 } 3668 3669 /** 3670 * Get supported compression algorithms. 3671 * @return supported compression algorithms. 3672 */ 3673 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 3674 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 3675 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 3676 for (String algoName : allAlgos) { 3677 try { 3678 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 3679 algo.getCompressor(); 3680 supportedAlgos.add(algo); 3681 } catch (Throwable t) { 3682 // this algo is not available 3683 } 3684 } 3685 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 3686 } 3687 3688 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 3689 Scan scan = new Scan().withStartRow(row); 3690 scan.setReadType(ReadType.PREAD); 3691 scan.setCaching(1); 3692 scan.setReversed(true); 3693 scan.addFamily(family); 3694 try (RegionScanner scanner = r.getScanner(scan)) { 3695 List<Cell> cells = new ArrayList<>(1); 3696 scanner.next(cells); 3697 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 3698 return null; 3699 } 3700 return Result.create(cells); 3701 } 3702 } 3703 3704 private boolean isTargetTable(final byte[] inRow, Cell c) { 3705 String inputRowString = Bytes.toString(inRow); 3706 int i = inputRowString.indexOf(HConstants.DELIMITER); 3707 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 3708 int o = outputRowString.indexOf(HConstants.DELIMITER); 3709 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 3710 } 3711 3712 /** 3713 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 3714 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 3715 * kerby KDC server and utility for using it, 3716 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 3717 * less baggage. It came in in HBASE-5291. 3718 */ 3719 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 3720 Properties conf = MiniKdc.createConf(); 3721 conf.put(MiniKdc.DEBUG, true); 3722 MiniKdc kdc = null; 3723 File dir = null; 3724 // There is time lag between selecting a port and trying to bind with it. It's possible that 3725 // another service captures the port in between which'll result in BindException. 3726 boolean bindException; 3727 int numTries = 0; 3728 do { 3729 try { 3730 bindException = false; 3731 dir = new File(getDataTestDir("kdc").toUri().getPath()); 3732 kdc = new MiniKdc(conf, dir); 3733 kdc.start(); 3734 } catch (BindException e) { 3735 FileUtils.deleteDirectory(dir); // clean directory 3736 numTries++; 3737 if (numTries == 3) { 3738 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 3739 throw e; 3740 } 3741 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 3742 bindException = true; 3743 } 3744 } while (bindException); 3745 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 3746 return kdc; 3747 } 3748 3749 public int getNumHFiles(final TableName tableName, final byte[] family) { 3750 int numHFiles = 0; 3751 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 3752 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 3753 } 3754 return numHFiles; 3755 } 3756 3757 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 3758 final byte[] family) { 3759 int numHFiles = 0; 3760 for (Region region : rs.getRegions(tableName)) { 3761 numHFiles += region.getStore(family).getStorefilesCount(); 3762 } 3763 return numHFiles; 3764 } 3765 3766 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 3767 assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode()); 3768 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 3769 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 3770 assertEquals(ltdFamilies.size(), rtdFamilies.size()); 3771 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 3772 it2 = rtdFamilies.iterator(); it.hasNext();) { 3773 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 3774 } 3775 } 3776 3777 /** 3778 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 3779 * invocations. 3780 */ 3781 public static void await(final long sleepMillis, final BooleanSupplier condition) 3782 throws InterruptedException { 3783 try { 3784 while (!condition.getAsBoolean()) { 3785 Thread.sleep(sleepMillis); 3786 } 3787 } catch (RuntimeException e) { 3788 if (e.getCause() instanceof AssertionError) { 3789 throw (AssertionError) e.getCause(); 3790 } 3791 throw e; 3792 } 3793 } 3794 3795 public void createRegionDir(RegionInfo hri) throws IOException { 3796 Path rootDir = getDataTestDir(); 3797 Path tableDir = CommonFSUtils.getTableDir(rootDir, hri.getTable()); 3798 Path regionDir = new Path(tableDir, hri.getEncodedName()); 3799 FileSystem fs = getTestFileSystem(); 3800 if (!fs.exists(regionDir)) { 3801 fs.mkdirs(regionDir); 3802 } 3803 } 3804 3805 public void createRegionDir(RegionInfo regionInfo, MasterFileSystem masterFileSystem) 3806 throws IOException { 3807 Path tableDir = 3808 CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), regionInfo.getTable()); 3809 HRegionFileSystem.createRegionOnFileSystem(conf, masterFileSystem.getFileSystem(), tableDir, 3810 regionInfo); 3811 } 3812}