001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.Closeable; 026import java.io.File; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.io.UncheckedIOException; 030import java.lang.reflect.Field; 031import java.lang.reflect.Modifier; 032import java.net.BindException; 033import java.net.DatagramSocket; 034import java.net.InetAddress; 035import java.net.ServerSocket; 036import java.net.Socket; 037import java.net.UnknownHostException; 038import java.nio.charset.StandardCharsets; 039import java.security.MessageDigest; 040import java.util.ArrayList; 041import java.util.Arrays; 042import java.util.Collection; 043import java.util.Collections; 044import java.util.HashSet; 045import java.util.Iterator; 046import java.util.List; 047import java.util.Map; 048import java.util.NavigableSet; 049import java.util.Properties; 050import java.util.Random; 051import java.util.Set; 052import java.util.TreeSet; 053import java.util.concurrent.ExecutionException; 054import java.util.concurrent.ThreadLocalRandom; 055import java.util.concurrent.TimeUnit; 056import java.util.concurrent.atomic.AtomicReference; 057import java.util.function.BooleanSupplier; 058import org.apache.commons.io.FileUtils; 059import org.apache.commons.lang3.RandomStringUtils; 060import org.apache.hadoop.conf.Configuration; 061import org.apache.hadoop.fs.FileSystem; 062import org.apache.hadoop.fs.Path; 063import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 064import org.apache.hadoop.hbase.Waiter.Predicate; 065import org.apache.hadoop.hbase.client.Admin; 066import org.apache.hadoop.hbase.client.AsyncAdmin; 067import org.apache.hadoop.hbase.client.AsyncClusterConnection; 068import org.apache.hadoop.hbase.client.BufferedMutator; 069import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 072import org.apache.hadoop.hbase.client.Connection; 073import org.apache.hadoop.hbase.client.ConnectionFactory; 074import org.apache.hadoop.hbase.client.Consistency; 075import org.apache.hadoop.hbase.client.Delete; 076import org.apache.hadoop.hbase.client.Durability; 077import org.apache.hadoop.hbase.client.Get; 078import org.apache.hadoop.hbase.client.Hbck; 079import org.apache.hadoop.hbase.client.MasterRegistry; 080import org.apache.hadoop.hbase.client.Put; 081import org.apache.hadoop.hbase.client.RegionInfo; 082import org.apache.hadoop.hbase.client.RegionInfoBuilder; 083import org.apache.hadoop.hbase.client.RegionLocator; 084import org.apache.hadoop.hbase.client.Result; 085import org.apache.hadoop.hbase.client.ResultScanner; 086import org.apache.hadoop.hbase.client.Scan; 087import org.apache.hadoop.hbase.client.Scan.ReadType; 088import org.apache.hadoop.hbase.client.Table; 089import org.apache.hadoop.hbase.client.TableDescriptor; 090import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 091import org.apache.hadoop.hbase.client.TableState; 092import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 093import org.apache.hadoop.hbase.fs.HFileSystem; 094import org.apache.hadoop.hbase.io.compress.Compression; 095import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 096import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 097import org.apache.hadoop.hbase.io.hfile.BlockCache; 098import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 099import org.apache.hadoop.hbase.io.hfile.HFile; 100import org.apache.hadoop.hbase.ipc.RpcServerInterface; 101import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 102import org.apache.hadoop.hbase.master.HMaster; 103import org.apache.hadoop.hbase.master.RegionState; 104import org.apache.hadoop.hbase.master.ServerManager; 105import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 106import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 107import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 108import org.apache.hadoop.hbase.master.assignment.RegionStates; 109import org.apache.hadoop.hbase.mob.MobFileCache; 110import org.apache.hadoop.hbase.regionserver.BloomType; 111import org.apache.hadoop.hbase.regionserver.ChunkCreator; 112import org.apache.hadoop.hbase.regionserver.HRegion; 113import org.apache.hadoop.hbase.regionserver.HRegionServer; 114import org.apache.hadoop.hbase.regionserver.HStore; 115import org.apache.hadoop.hbase.regionserver.InternalScanner; 116import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 117import org.apache.hadoop.hbase.regionserver.Region; 118import org.apache.hadoop.hbase.regionserver.RegionScanner; 119import org.apache.hadoop.hbase.regionserver.RegionServerServices; 120import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 121import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 122import org.apache.hadoop.hbase.security.User; 123import org.apache.hadoop.hbase.security.UserProvider; 124import org.apache.hadoop.hbase.security.access.AccessController; 125import org.apache.hadoop.hbase.security.access.PermissionStorage; 126import org.apache.hadoop.hbase.security.access.SecureTestUtil; 127import org.apache.hadoop.hbase.security.token.TokenProvider; 128import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 129import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 130import org.apache.hadoop.hbase.util.Bytes; 131import org.apache.hadoop.hbase.util.CommonFSUtils; 132import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 133import org.apache.hadoop.hbase.util.FSUtils; 134import org.apache.hadoop.hbase.util.JVM; 135import org.apache.hadoop.hbase.util.JVMClusterUtil; 136import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 137import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 138import org.apache.hadoop.hbase.util.Pair; 139import org.apache.hadoop.hbase.util.ReflectionUtils; 140import org.apache.hadoop.hbase.util.RetryCounter; 141import org.apache.hadoop.hbase.util.Threads; 142import org.apache.hadoop.hbase.wal.WAL; 143import org.apache.hadoop.hbase.wal.WALFactory; 144import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 145import org.apache.hadoop.hbase.zookeeper.ZKConfig; 146import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 147import org.apache.hadoop.hdfs.DFSClient; 148import org.apache.hadoop.hdfs.DistributedFileSystem; 149import org.apache.hadoop.hdfs.MiniDFSCluster; 150import org.apache.hadoop.hdfs.server.datanode.DataNode; 151import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 152import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 153import org.apache.hadoop.mapred.JobConf; 154import org.apache.hadoop.mapred.MiniMRCluster; 155import org.apache.hadoop.mapred.TaskLog; 156import org.apache.hadoop.minikdc.MiniKdc; 157import org.apache.yetus.audience.InterfaceAudience; 158import org.apache.yetus.audience.InterfaceStability; 159import org.apache.zookeeper.WatchedEvent; 160import org.apache.zookeeper.ZooKeeper; 161import org.apache.zookeeper.ZooKeeper.States; 162 163import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 164 165import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 166 167/** 168 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 169 * functionality. Create an instance and keep it around testing HBase. 170 * <p/> 171 * This class is meant to be your one-stop shop for anything you might need testing. Manages one 172 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster}, 173 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real 174 * cluster. 175 * <p/> 176 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration. 177 * <p/> 178 * It does not set logging levels. 179 * <p/> 180 * In the configuration properties, default values for master-info-port and region-server-port are 181 * overridden such that a random port will be assigned (thus avoiding port contention if another 182 * local HBase instance is already running). 183 * <p/> 184 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 185 * setting it to true. 186 */ 187@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 188@InterfaceStability.Evolving 189public class HBaseTestingUtil extends HBaseZKTestingUtil { 190 191 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 192 193 private MiniDFSCluster dfsCluster = null; 194 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 195 196 private volatile HBaseClusterInterface hbaseCluster = null; 197 private MiniMRCluster mrCluster = null; 198 199 /** If there is a mini cluster running for this testing utility instance. */ 200 private volatile boolean miniClusterRunning; 201 202 private String hadoopLogDir; 203 204 /** 205 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 206 */ 207 private Path dataTestDirOnTestFS = null; 208 209 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 210 211 /** Filesystem URI used for map-reduce mini-cluster setup */ 212 private static String FS_URI; 213 214 /** This is for unit tests parameterized with a single boolean. */ 215 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 216 217 /** 218 * Checks to see if a specific port is available. 219 * @param port the port number to check for availability 220 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 221 */ 222 public static boolean available(int port) { 223 ServerSocket ss = null; 224 DatagramSocket ds = null; 225 try { 226 ss = new ServerSocket(port); 227 ss.setReuseAddress(true); 228 ds = new DatagramSocket(port); 229 ds.setReuseAddress(true); 230 return true; 231 } catch (IOException e) { 232 // Do nothing 233 } finally { 234 if (ds != null) { 235 ds.close(); 236 } 237 238 if (ss != null) { 239 try { 240 ss.close(); 241 } catch (IOException e) { 242 /* should not be thrown */ 243 } 244 } 245 } 246 247 return false; 248 } 249 250 /** 251 * Create all combinations of Bloom filters and compression algorithms for testing. 252 */ 253 private static List<Object[]> bloomAndCompressionCombinations() { 254 List<Object[]> configurations = new ArrayList<>(); 255 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 256 for (BloomType bloomType : BloomType.values()) { 257 configurations.add(new Object[] { comprAlgo, bloomType }); 258 } 259 } 260 return Collections.unmodifiableList(configurations); 261 } 262 263 /** 264 * Create combination of memstoreTS and tags 265 */ 266 private static List<Object[]> memStoreTSAndTagsCombination() { 267 List<Object[]> configurations = new ArrayList<>(); 268 configurations.add(new Object[] { false, false }); 269 configurations.add(new Object[] { false, true }); 270 configurations.add(new Object[] { true, false }); 271 configurations.add(new Object[] { true, true }); 272 return Collections.unmodifiableList(configurations); 273 } 274 275 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 276 List<Object[]> configurations = new ArrayList<>(); 277 configurations.add(new Object[] { false, false, true }); 278 configurations.add(new Object[] { false, false, false }); 279 configurations.add(new Object[] { false, true, true }); 280 configurations.add(new Object[] { false, true, false }); 281 configurations.add(new Object[] { true, false, true }); 282 configurations.add(new Object[] { true, false, false }); 283 configurations.add(new Object[] { true, true, true }); 284 configurations.add(new Object[] { true, true, false }); 285 return Collections.unmodifiableList(configurations); 286 } 287 288 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 289 bloomAndCompressionCombinations(); 290 291 /** 292 * <p> 293 * Create an HBaseTestingUtility using a default configuration. 294 * <p> 295 * Initially, all tmp files are written to a local test data directory. Once 296 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 297 * data will be written to the DFS directory instead. 298 */ 299 public HBaseTestingUtil() { 300 this(HBaseConfiguration.create()); 301 } 302 303 /** 304 * <p> 305 * Create an HBaseTestingUtility using a given configuration. 306 * <p> 307 * Initially, all tmp files are written to a local test data directory. Once 308 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 309 * data will be written to the DFS directory instead. 310 * @param conf The configuration to use for further operations 311 */ 312 public HBaseTestingUtil(@Nullable Configuration conf) { 313 super(conf); 314 315 // a hbase checksum verification failure will cause unit tests to fail 316 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 317 318 // Save this for when setting default file:// breaks things 319 if (this.conf.get("fs.defaultFS") != null) { 320 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 321 } 322 if (this.conf.get(HConstants.HBASE_DIR) != null) { 323 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 324 } 325 // Every cluster is a local cluster until we start DFS 326 // Note that conf could be null, but this.conf will not be 327 String dataTestDir = getDataTestDir().toString(); 328 this.conf.set("fs.defaultFS", "file:///"); 329 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 330 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 331 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 332 // If the value for random ports isn't set set it to true, thus making 333 // tests opt-out for random port assignment 334 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 335 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 336 } 337 338 /** 339 * Close both the region {@code r} and it's underlying WAL. For use in tests. 340 */ 341 public static void closeRegionAndWAL(final Region r) throws IOException { 342 closeRegionAndWAL((HRegion) r); 343 } 344 345 /** 346 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 347 */ 348 public static void closeRegionAndWAL(final HRegion r) throws IOException { 349 if (r == null) return; 350 r.close(); 351 if (r.getWAL() == null) return; 352 r.getWAL().close(); 353 } 354 355 /** 356 * Start mini secure cluster with given kdc and principals. 357 * @param kdc Mini kdc server 358 * @param servicePrincipal Service principal without realm. 359 * @param spnegoPrincipal Spnego principal without realm. 360 * @return Handler to shutdown the cluster 361 */ 362 public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal, 363 String spnegoPrincipal) throws Exception { 364 Configuration conf = getConfiguration(); 365 366 SecureTestUtil.enableSecurity(conf); 367 VisibilityTestUtil.enableVisiblityLabels(conf); 368 SecureTestUtil.verifyConfiguration(conf); 369 370 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 371 AccessController.class.getName() + ',' + TokenProvider.class.getName()); 372 373 HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(), 374 spnegoPrincipal + '@' + kdc.getRealm()); 375 376 startMiniCluster(); 377 try { 378 waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); 379 } catch (Exception e) { 380 shutdownMiniCluster(); 381 throw e; 382 } 383 384 return this::shutdownMiniCluster; 385 } 386 387 /** 388 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 389 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 390 * by the Configuration. If say, a Connection was being used against a cluster that had been 391 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 392 * Rather than use the return direct, its usually best to make a copy and use that. Do 393 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 394 * @return Instance of Configuration. 395 */ 396 @Override 397 public Configuration getConfiguration() { 398 return super.getConfiguration(); 399 } 400 401 public void setHBaseCluster(HBaseClusterInterface hbaseCluster) { 402 this.hbaseCluster = hbaseCluster; 403 } 404 405 /** 406 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 407 * have many concurrent tests running if we need to. Moding a System property is not the way to do 408 * concurrent instances -- another instance could grab the temporary value unintentionally -- but 409 * not anything can do about it at moment; single instance only is how the minidfscluster works. 410 * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir 411 * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir 412 * (We do not create them!). 413 * @return The calculated data test build directory, if newly-created. 414 */ 415 @Override 416 protected Path setupDataTestDir() { 417 Path testPath = super.setupDataTestDir(); 418 if (null == testPath) { 419 return null; 420 } 421 422 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 423 424 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 425 // we want our own value to ensure uniqueness on the same machine 426 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 427 428 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 429 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 430 return testPath; 431 } 432 433 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 434 435 String sysValue = System.getProperty(propertyName); 436 437 if (sysValue != null) { 438 // There is already a value set. So we do nothing but hope 439 // that there will be no conflicts 440 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 441 + " so I do NOT create it in " + parent); 442 String confValue = conf.get(propertyName); 443 if (confValue != null && !confValue.endsWith(sysValue)) { 444 LOG.warn(propertyName + " property value differs in configuration and system: " 445 + "Configuration=" + confValue + " while System=" + sysValue 446 + " Erasing configuration value by system value."); 447 } 448 conf.set(propertyName, sysValue); 449 } else { 450 // Ok, it's not set, so we create it as a subdirectory 451 createSubDir(propertyName, parent, subDirName); 452 System.setProperty(propertyName, conf.get(propertyName)); 453 } 454 } 455 456 /** 457 * @return Where to write test data on the test filesystem; Returns working directory for the test 458 * filesystem by default 459 * @see #setupDataTestDirOnTestFS() 460 * @see #getTestFileSystem() 461 */ 462 private Path getBaseTestDirOnTestFS() throws IOException { 463 FileSystem fs = getTestFileSystem(); 464 return new Path(fs.getWorkingDirectory(), "test-data"); 465 } 466 467 /** 468 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 469 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 470 * on it. 471 * @return a unique path in the test filesystem 472 */ 473 public Path getDataTestDirOnTestFS() throws IOException { 474 if (dataTestDirOnTestFS == null) { 475 setupDataTestDirOnTestFS(); 476 } 477 478 return dataTestDirOnTestFS; 479 } 480 481 /** 482 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 483 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 484 * on it. 485 * @return a unique path in the test filesystem 486 * @param subdirName name of the subdir to create under the base test dir 487 */ 488 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 489 return new Path(getDataTestDirOnTestFS(), subdirName); 490 } 491 492 /** 493 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 494 * setup. 495 */ 496 private void setupDataTestDirOnTestFS() throws IOException { 497 if (dataTestDirOnTestFS != null) { 498 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 499 return; 500 } 501 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 502 } 503 504 /** 505 * Sets up a new path in test filesystem to be used by tests. 506 */ 507 private Path getNewDataTestDirOnTestFS() throws IOException { 508 // The file system can be either local, mini dfs, or if the configuration 509 // is supplied externally, it can be an external cluster FS. If it is a local 510 // file system, the tests should use getBaseTestDir, otherwise, we can use 511 // the working directory, and create a unique sub dir there 512 FileSystem fs = getTestFileSystem(); 513 Path newDataTestDir; 514 String randomStr = getRandomUUID().toString(); 515 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 516 newDataTestDir = new Path(getDataTestDir(), randomStr); 517 File dataTestDir = new File(newDataTestDir.toString()); 518 if (deleteOnExit()) dataTestDir.deleteOnExit(); 519 } else { 520 Path base = getBaseTestDirOnTestFS(); 521 newDataTestDir = new Path(base, randomStr); 522 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 523 } 524 return newDataTestDir; 525 } 526 527 /** 528 * Cleans the test data directory on the test filesystem. 529 * @return True if we removed the test dirs 530 */ 531 public boolean cleanupDataTestDirOnTestFS() throws IOException { 532 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 533 if (ret) { 534 dataTestDirOnTestFS = null; 535 } 536 return ret; 537 } 538 539 /** 540 * Cleans a subdirectory under the test data directory on the test filesystem. 541 * @return True if we removed child 542 */ 543 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 544 Path cpath = getDataTestDirOnTestFS(subdirName); 545 return getTestFileSystem().delete(cpath, true); 546 } 547 548 /** 549 * Start a minidfscluster. 550 * @param servers How many DNs to start. 551 * @see #shutdownMiniDFSCluster() 552 * @return The mini dfs cluster created. 553 */ 554 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 555 return startMiniDFSCluster(servers, null); 556 } 557 558 /** 559 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 560 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 561 * instances of the datanodes will have the same host name. 562 * @param hosts hostnames DNs to run on. 563 * @see #shutdownMiniDFSCluster() 564 * @return The mini dfs cluster created. 565 */ 566 public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception { 567 if (hosts != null && hosts.length != 0) { 568 return startMiniDFSCluster(hosts.length, hosts); 569 } else { 570 return startMiniDFSCluster(1, null); 571 } 572 } 573 574 /** 575 * Start a minidfscluster. Can only create one. 576 * @param servers How many DNs to start. 577 * @param hosts hostnames DNs to run on. 578 * @see #shutdownMiniDFSCluster() 579 * @return The mini dfs cluster created. 580 */ 581 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception { 582 return startMiniDFSCluster(servers, null, hosts); 583 } 584 585 private void setFs() throws IOException { 586 if (this.dfsCluster == null) { 587 LOG.info("Skipping setting fs because dfsCluster is null"); 588 return; 589 } 590 FileSystem fs = this.dfsCluster.getFileSystem(); 591 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 592 593 // re-enable this check with dfs 594 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 595 } 596 597 // Workaround to avoid IllegalThreadStateException 598 // See HBASE-27148 for more details 599 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 600 601 private volatile boolean stopped = false; 602 603 private final MiniDFSCluster cluster; 604 605 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 606 super("FsDatasetAsyncDiskServiceFixer"); 607 setDaemon(true); 608 this.cluster = cluster; 609 } 610 611 @Override 612 public void run() { 613 while (!stopped) { 614 try { 615 Thread.sleep(30000); 616 } catch (InterruptedException e) { 617 Thread.currentThread().interrupt(); 618 continue; 619 } 620 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 621 // timeout of the thread pool executor is 60 seconds by default. 622 try { 623 for (DataNode dn : cluster.getDataNodes()) { 624 FsDatasetSpi<?> dataset = dn.getFSDataset(); 625 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 626 service.setAccessible(true); 627 Object asyncDiskService = service.get(dataset); 628 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 629 group.setAccessible(true); 630 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 631 if (threadGroup.isDaemon()) { 632 threadGroup.setDaemon(false); 633 } 634 } 635 } catch (NoSuchFieldException e) { 636 LOG.debug("NoSuchFieldException: " + e.getMessage() 637 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 638 + "See HBASE-27595 for details."); 639 } catch (Exception e) { 640 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 641 } 642 } 643 } 644 645 void shutdown() { 646 stopped = true; 647 interrupt(); 648 } 649 } 650 651 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts) 652 throws Exception { 653 createDirsAndSetProperties(); 654 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 655 656 this.dfsCluster = 657 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 658 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 659 this.dfsClusterFixer.start(); 660 // Set this just-started cluster as our filesystem. 661 setFs(); 662 663 // Wait for the cluster to be totally up 664 this.dfsCluster.waitClusterUp(); 665 666 // reset the test directory for test file system 667 dataTestDirOnTestFS = null; 668 String dataTestDir = getDataTestDir().toString(); 669 conf.set(HConstants.HBASE_DIR, dataTestDir); 670 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 671 672 return this.dfsCluster; 673 } 674 675 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 676 createDirsAndSetProperties(); 677 dfsCluster = 678 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 679 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 680 this.dfsClusterFixer.start(); 681 return dfsCluster; 682 } 683 684 /** 685 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 686 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 687 * the conf. 688 * 689 * <pre> 690 * Configuration conf = TEST_UTIL.getConfiguration(); 691 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 692 * Map.Entry<String, String> e = i.next(); 693 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 694 * } 695 * </pre> 696 */ 697 private void createDirsAndSetProperties() throws IOException { 698 setupClusterTestDir(); 699 conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath()); 700 createDirAndSetProperty("test.cache.data"); 701 createDirAndSetProperty("hadoop.tmp.dir"); 702 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 703 createDirAndSetProperty("mapreduce.cluster.local.dir"); 704 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 705 enableShortCircuit(); 706 707 Path root = getDataTestDirOnTestFS("hadoop"); 708 conf.set(MapreduceTestingShim.getMROutputDirProp(), 709 new Path(root, "mapred-output-dir").toString()); 710 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 711 conf.set("mapreduce.jobtracker.staging.root.dir", 712 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 713 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 714 conf.set("yarn.app.mapreduce.am.staging-dir", 715 new Path(root, "mapreduce-am-staging-root-dir").toString()); 716 717 // Frustrate yarn's and hdfs's attempts at writing /tmp. 718 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 719 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 720 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 721 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 722 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 723 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 724 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 725 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 726 createDirAndSetProperty("dfs.journalnode.edits.dir"); 727 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 728 createDirAndSetProperty("nfs.dump.dir"); 729 createDirAndSetProperty("java.io.tmpdir"); 730 createDirAndSetProperty("dfs.journalnode.edits.dir"); 731 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 732 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 733 } 734 735 /** 736 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 737 * Default to false. 738 */ 739 public boolean isNewVersionBehaviorEnabled() { 740 final String propName = "hbase.tests.new.version.behavior"; 741 String v = System.getProperty(propName); 742 if (v != null) { 743 return Boolean.parseBoolean(v); 744 } 745 return false; 746 } 747 748 /** 749 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 750 * allows to specify this parameter on the command line. If not set, default is true. 751 */ 752 public boolean isReadShortCircuitOn() { 753 final String propName = "hbase.tests.use.shortcircuit.reads"; 754 String readOnProp = System.getProperty(propName); 755 if (readOnProp != null) { 756 return Boolean.parseBoolean(readOnProp); 757 } else { 758 return conf.getBoolean(propName, false); 759 } 760 } 761 762 /** 763 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 764 * including skipping the hdfs checksum checks. 765 */ 766 private void enableShortCircuit() { 767 if (isReadShortCircuitOn()) { 768 String curUser = System.getProperty("user.name"); 769 LOG.info("read short circuit is ON for user " + curUser); 770 // read short circuit, for hdfs 771 conf.set("dfs.block.local-path-access.user", curUser); 772 // read short circuit, for hbase 773 conf.setBoolean("dfs.client.read.shortcircuit", true); 774 // Skip checking checksum, for the hdfs client and the datanode 775 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 776 } else { 777 LOG.info("read short circuit is OFF"); 778 } 779 } 780 781 private String createDirAndSetProperty(final String property) { 782 return createDirAndSetProperty(property, property); 783 } 784 785 private String createDirAndSetProperty(final String relPath, String property) { 786 String path = getDataTestDir(relPath).toString(); 787 System.setProperty(property, path); 788 conf.set(property, path); 789 new File(path).mkdirs(); 790 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 791 return path; 792 } 793 794 /** 795 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 796 */ 797 public void shutdownMiniDFSCluster() throws IOException { 798 if (this.dfsCluster != null) { 799 // The below throws an exception per dn, AsynchronousCloseException. 800 this.dfsCluster.shutdown(); 801 dfsCluster = null; 802 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 803 // have a fixer 804 if (dfsClusterFixer != null) { 805 this.dfsClusterFixer.shutdown(); 806 dfsClusterFixer = null; 807 } 808 dataTestDirOnTestFS = null; 809 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 810 } 811 } 812 813 /** 814 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 815 * other options will use default values, defined in {@link StartTestingClusterOption.Builder}. 816 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 817 * @see #startMiniCluster(StartTestingClusterOption option) 818 * @see #shutdownMiniDFSCluster() 819 */ 820 public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception { 821 StartTestingClusterOption option = StartTestingClusterOption.builder() 822 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 823 return startMiniCluster(option); 824 } 825 826 /** 827 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 828 * value can be found in {@link StartTestingClusterOption.Builder}. 829 * @see #startMiniCluster(StartTestingClusterOption option) 830 * @see #shutdownMiniDFSCluster() 831 */ 832 public SingleProcessHBaseCluster startMiniCluster() throws Exception { 833 return startMiniCluster(StartTestingClusterOption.builder().build()); 834 } 835 836 /** 837 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 838 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 839 * under System property test.build.data, to be cleaned up on exit. 840 * @see #shutdownMiniDFSCluster() 841 */ 842 public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option) 843 throws Exception { 844 LOG.info("Starting up minicluster with option: {}", option); 845 846 // If we already put up a cluster, fail. 847 if (miniClusterRunning) { 848 throw new IllegalStateException("A mini-cluster is already running"); 849 } 850 miniClusterRunning = true; 851 852 setupClusterTestDir(); 853 854 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 855 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 856 if (dfsCluster == null) { 857 LOG.info("STARTING DFS"); 858 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 859 } else { 860 LOG.info("NOT STARTING DFS"); 861 } 862 863 // Start up a zk cluster. 864 if (getZkCluster() == null) { 865 startMiniZKCluster(option.getNumZkServers()); 866 } 867 868 // Start the MiniHBaseCluster 869 return startMiniHBaseCluster(option); 870 } 871 872 /** 873 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 874 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 875 * @return Reference to the hbase mini hbase cluster. 876 * @see #startMiniCluster(StartTestingClusterOption) 877 * @see #shutdownMiniHBaseCluster() 878 */ 879 public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option) 880 throws IOException, InterruptedException { 881 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 882 createRootDir(option.isCreateRootDir()); 883 if (option.isCreateWALDir()) { 884 createWALRootDir(); 885 } 886 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 887 // for tests that do not read hbase-defaults.xml 888 setHBaseFsTmpDir(); 889 890 // These settings will make the server waits until this exact number of 891 // regions servers are connected. 892 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 893 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 894 } 895 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 896 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 897 } 898 899 Configuration c = new Configuration(this.conf); 900 this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(), 901 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 902 option.getMasterClass(), option.getRsClass()); 903 // Populate the master address configuration from mini cluster configuration. 904 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 905 // Don't leave here till we've done a successful scan of the hbase:meta 906 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 907 ResultScanner s = t.getScanner(new Scan())) { 908 for (;;) { 909 if (s.next() == null) { 910 break; 911 } 912 } 913 } 914 915 getAdmin(); // create immediately the hbaseAdmin 916 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 917 918 return (SingleProcessHBaseCluster) hbaseCluster; 919 } 920 921 /** 922 * Starts up mini hbase cluster using default options. Default options can be found in 923 * {@link StartTestingClusterOption.Builder}. 924 * @see #startMiniHBaseCluster(StartTestingClusterOption) 925 * @see #shutdownMiniHBaseCluster() 926 */ 927 public SingleProcessHBaseCluster startMiniHBaseCluster() 928 throws IOException, InterruptedException { 929 return startMiniHBaseCluster(StartTestingClusterOption.builder().build()); 930 } 931 932 /** 933 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 934 * {@link #startMiniCluster()}. All other options will use default values, defined in 935 * {@link StartTestingClusterOption.Builder}. 936 * @param numMasters Master node number. 937 * @param numRegionServers Number of region servers. 938 * @return The mini HBase cluster created. 939 * @see #shutdownMiniHBaseCluster() 940 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 941 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 942 * @see #startMiniHBaseCluster(StartTestingClusterOption) 943 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 944 */ 945 @Deprecated 946 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 947 throws IOException, InterruptedException { 948 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 949 .numRegionServers(numRegionServers).build(); 950 return startMiniHBaseCluster(option); 951 } 952 953 /** 954 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 955 * {@link #startMiniCluster()}. All other options will use default values, defined in 956 * {@link StartTestingClusterOption.Builder}. 957 * @param numMasters Master node number. 958 * @param numRegionServers Number of region servers. 959 * @param rsPorts Ports that RegionServer should use. 960 * @return The mini HBase cluster created. 961 * @see #shutdownMiniHBaseCluster() 962 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 963 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 964 * @see #startMiniHBaseCluster(StartTestingClusterOption) 965 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 966 */ 967 @Deprecated 968 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 969 List<Integer> rsPorts) throws IOException, InterruptedException { 970 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 971 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 972 return startMiniHBaseCluster(option); 973 } 974 975 /** 976 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 977 * {@link #startMiniCluster()}. All other options will use default values, defined in 978 * {@link StartTestingClusterOption.Builder}. 979 * @param numMasters Master node number. 980 * @param numRegionServers Number of region servers. 981 * @param rsPorts Ports that RegionServer should use. 982 * @param masterClass The class to use as HMaster, or null for default. 983 * @param rsClass The class to use as HRegionServer, or null for default. 984 * @param createRootDir Whether to create a new root or data directory path. 985 * @param createWALDir Whether to create a new WAL directory. 986 * @return The mini HBase cluster created. 987 * @see #shutdownMiniHBaseCluster() 988 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 989 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 990 * @see #startMiniHBaseCluster(StartTestingClusterOption) 991 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 992 */ 993 @Deprecated 994 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 995 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 996 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass, 997 boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException { 998 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 999 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 1000 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 1001 return startMiniHBaseCluster(option); 1002 } 1003 1004 /** 1005 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 1006 * want to keep dfs/zk up and just stop/start hbase. 1007 * @param servers number of region servers 1008 */ 1009 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 1010 this.restartHBaseCluster(servers, null); 1011 } 1012 1013 public void restartHBaseCluster(int servers, List<Integer> ports) 1014 throws IOException, InterruptedException { 1015 StartTestingClusterOption option = 1016 StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 1017 restartHBaseCluster(option); 1018 invalidateConnection(); 1019 } 1020 1021 public void restartHBaseCluster(StartTestingClusterOption option) 1022 throws IOException, InterruptedException { 1023 closeConnection(); 1024 this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(), 1025 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1026 option.getMasterClass(), option.getRsClass()); 1027 // Don't leave here till we've done a successful scan of the hbase:meta 1028 Connection conn = ConnectionFactory.createConnection(this.conf); 1029 Table t = conn.getTable(TableName.META_TABLE_NAME); 1030 ResultScanner s = t.getScanner(new Scan()); 1031 while (s.next() != null) { 1032 // do nothing 1033 } 1034 LOG.info("HBase has been restarted"); 1035 s.close(); 1036 t.close(); 1037 conn.close(); 1038 } 1039 1040 /** 1041 * Returns current mini hbase cluster. Only has something in it after a call to 1042 * {@link #startMiniCluster()}. 1043 * @see #startMiniCluster() 1044 */ 1045 public SingleProcessHBaseCluster getMiniHBaseCluster() { 1046 if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) { 1047 return (SingleProcessHBaseCluster) this.hbaseCluster; 1048 } 1049 throw new RuntimeException( 1050 hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName()); 1051 } 1052 1053 /** 1054 * Stops mini hbase, zk, and hdfs clusters. 1055 * @see #startMiniCluster(int) 1056 */ 1057 public void shutdownMiniCluster() throws IOException { 1058 LOG.info("Shutting down minicluster"); 1059 shutdownMiniHBaseCluster(); 1060 shutdownMiniDFSCluster(); 1061 shutdownMiniZKCluster(); 1062 1063 cleanupTestDir(); 1064 miniClusterRunning = false; 1065 LOG.info("Minicluster is down"); 1066 } 1067 1068 /** 1069 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1070 * @throws java.io.IOException in case command is unsuccessful 1071 */ 1072 public void shutdownMiniHBaseCluster() throws IOException { 1073 cleanup(); 1074 if (this.hbaseCluster != null) { 1075 this.hbaseCluster.shutdown(); 1076 // Wait till hbase is down before going on to shutdown zk. 1077 this.hbaseCluster.waitUntilShutDown(); 1078 this.hbaseCluster = null; 1079 } 1080 if (zooKeeperWatcher != null) { 1081 zooKeeperWatcher.close(); 1082 zooKeeperWatcher = null; 1083 } 1084 } 1085 1086 /** 1087 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1088 * @throws java.io.IOException throws in case command is unsuccessful 1089 */ 1090 public void killMiniHBaseCluster() throws IOException { 1091 cleanup(); 1092 if (this.hbaseCluster != null) { 1093 getMiniHBaseCluster().killAll(); 1094 this.hbaseCluster = null; 1095 } 1096 if (zooKeeperWatcher != null) { 1097 zooKeeperWatcher.close(); 1098 zooKeeperWatcher = null; 1099 } 1100 } 1101 1102 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1103 private void cleanup() throws IOException { 1104 closeConnection(); 1105 // unset the configuration for MIN and MAX RS to start 1106 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1107 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1108 } 1109 1110 /** 1111 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1112 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1113 * If false, previous path is used. Note: this does not cause the root dir to be created. 1114 * @return Fully qualified path for the default hbase root dir 1115 */ 1116 public Path getDefaultRootDirPath(boolean create) throws IOException { 1117 if (!create) { 1118 return getDataTestDirOnTestFS(); 1119 } else { 1120 return getNewDataTestDirOnTestFS(); 1121 } 1122 } 1123 1124 /** 1125 * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that 1126 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1127 * @return Fully qualified path for the default hbase root dir 1128 */ 1129 public Path getDefaultRootDirPath() throws IOException { 1130 return getDefaultRootDirPath(false); 1131 } 1132 1133 /** 1134 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1135 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1136 * startup. You'd only use this method if you were doing manual operation. 1137 * @param create This flag decides whether to get a new root or data directory path or not, if it 1138 * has been fetched already. Note : Directory will be made irrespective of whether 1139 * path has been fetched or not. If directory already exists, it will be overwritten 1140 * @return Fully qualified path to hbase root dir 1141 */ 1142 public Path createRootDir(boolean create) throws IOException { 1143 FileSystem fs = FileSystem.get(this.conf); 1144 Path hbaseRootdir = getDefaultRootDirPath(create); 1145 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1146 fs.mkdirs(hbaseRootdir); 1147 FSUtils.setVersion(fs, hbaseRootdir); 1148 return hbaseRootdir; 1149 } 1150 1151 /** 1152 * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code> 1153 * flag is false. 1154 * @return Fully qualified path to hbase root dir 1155 */ 1156 public Path createRootDir() throws IOException { 1157 return createRootDir(false); 1158 } 1159 1160 /** 1161 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1162 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1163 * this method if you were doing manual operation. 1164 * @return Fully qualified path to hbase root dir 1165 */ 1166 public Path createWALRootDir() throws IOException { 1167 FileSystem fs = FileSystem.get(this.conf); 1168 Path walDir = getNewDataTestDirOnTestFS(); 1169 CommonFSUtils.setWALRootDir(this.conf, walDir); 1170 fs.mkdirs(walDir); 1171 return walDir; 1172 } 1173 1174 private void setHBaseFsTmpDir() throws IOException { 1175 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1176 if (hbaseFsTmpDirInString == null) { 1177 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1178 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1179 } else { 1180 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1181 } 1182 } 1183 1184 /** 1185 * Flushes all caches in the mini hbase cluster 1186 */ 1187 public void flush() throws IOException { 1188 getMiniHBaseCluster().flushcache(); 1189 } 1190 1191 /** 1192 * Flushes all caches in the mini hbase cluster 1193 */ 1194 public void flush(TableName tableName) throws IOException { 1195 getMiniHBaseCluster().flushcache(tableName); 1196 } 1197 1198 /** 1199 * Compact all regions in the mini hbase cluster 1200 */ 1201 public void compact(boolean major) throws IOException { 1202 getMiniHBaseCluster().compact(major); 1203 } 1204 1205 /** 1206 * Compact all of a table's reagion in the mini hbase cluster 1207 */ 1208 public void compact(TableName tableName, boolean major) throws IOException { 1209 getMiniHBaseCluster().compact(tableName, major); 1210 } 1211 1212 /** 1213 * Create a table. 1214 * @return A Table instance for the created table. 1215 */ 1216 public Table createTable(TableName tableName, String family) throws IOException { 1217 return createTable(tableName, new String[] { family }); 1218 } 1219 1220 /** 1221 * Create a table. 1222 * @return A Table instance for the created table. 1223 */ 1224 public Table createTable(TableName tableName, String[] families) throws IOException { 1225 List<byte[]> fams = new ArrayList<>(families.length); 1226 for (String family : families) { 1227 fams.add(Bytes.toBytes(family)); 1228 } 1229 return createTable(tableName, fams.toArray(new byte[0][])); 1230 } 1231 1232 /** 1233 * Create a table. 1234 * @return A Table instance for the created table. 1235 */ 1236 public Table createTable(TableName tableName, byte[] family) throws IOException { 1237 return createTable(tableName, new byte[][] { family }); 1238 } 1239 1240 /** 1241 * Create a table with multiple regions. 1242 * @return A Table instance for the created table. 1243 */ 1244 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1245 throws IOException { 1246 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1247 byte[] startKey = Bytes.toBytes("aaaaa"); 1248 byte[] endKey = Bytes.toBytes("zzzzz"); 1249 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1250 1251 return createTable(tableName, new byte[][] { family }, splitKeys); 1252 } 1253 1254 /** 1255 * Create a table. 1256 * @return A Table instance for the created table. 1257 */ 1258 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1259 return createTable(tableName, families, (byte[][]) null); 1260 } 1261 1262 /** 1263 * Create a table with multiple regions. 1264 * @return A Table instance for the created table. 1265 */ 1266 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1267 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1268 } 1269 1270 /** 1271 * Create a table with multiple regions. 1272 * @param replicaCount replica count. 1273 * @return A Table instance for the created table. 1274 */ 1275 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1276 throws IOException { 1277 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1278 } 1279 1280 /** 1281 * Create a table. 1282 * @return A Table instance for the created table. 1283 */ 1284 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1285 throws IOException { 1286 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1287 } 1288 1289 /** 1290 * Create a table. 1291 * @param tableName the table name 1292 * @param families the families 1293 * @param splitKeys the splitkeys 1294 * @param replicaCount the region replica count 1295 * @return A Table instance for the created table. 1296 * @throws IOException throws IOException 1297 */ 1298 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1299 int replicaCount) throws IOException { 1300 return createTable(tableName, families, splitKeys, replicaCount, 1301 new Configuration(getConfiguration())); 1302 } 1303 1304 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1305 byte[] endKey, int numRegions) throws IOException { 1306 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1307 1308 getAdmin().createTable(desc, startKey, endKey, numRegions); 1309 // HBaseAdmin only waits for regions to appear in hbase:meta we 1310 // should wait until they are assigned 1311 waitUntilAllRegionsAssigned(tableName); 1312 return getConnection().getTable(tableName); 1313 } 1314 1315 /** 1316 * Create a table. 1317 * @param c Configuration to use 1318 * @return A Table instance for the created table. 1319 */ 1320 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1321 throws IOException { 1322 return createTable(htd, families, null, c); 1323 } 1324 1325 /** 1326 * Create a table. 1327 * @param htd table descriptor 1328 * @param families array of column families 1329 * @param splitKeys array of split keys 1330 * @param c Configuration to use 1331 * @return A Table instance for the created table. 1332 * @throws IOException if getAdmin or createTable fails 1333 */ 1334 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1335 Configuration c) throws IOException { 1336 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1337 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1338 // on is interfering. 1339 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1340 } 1341 1342 /** 1343 * Create a table. 1344 * @param htd table descriptor 1345 * @param families array of column families 1346 * @param splitKeys array of split keys 1347 * @param type Bloom type 1348 * @param blockSize block size 1349 * @param c Configuration to use 1350 * @return A Table instance for the created table. 1351 * @throws IOException if getAdmin or createTable fails 1352 */ 1353 1354 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1355 BloomType type, int blockSize, Configuration c) throws IOException { 1356 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1357 for (byte[] family : families) { 1358 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1359 .setBloomFilterType(type).setBlocksize(blockSize); 1360 if (isNewVersionBehaviorEnabled()) { 1361 cfdb.setNewVersionBehavior(true); 1362 } 1363 builder.setColumnFamily(cfdb.build()); 1364 } 1365 TableDescriptor td = builder.build(); 1366 if (splitKeys != null) { 1367 getAdmin().createTable(td, splitKeys); 1368 } else { 1369 getAdmin().createTable(td); 1370 } 1371 // HBaseAdmin only waits for regions to appear in hbase:meta 1372 // we should wait until they are assigned 1373 waitUntilAllRegionsAssigned(td.getTableName()); 1374 return getConnection().getTable(td.getTableName()); 1375 } 1376 1377 /** 1378 * Create a table. 1379 * @param htd table descriptor 1380 * @param splitRows array of split keys 1381 * @return A Table instance for the created table. 1382 */ 1383 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1384 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1385 if (isNewVersionBehaviorEnabled()) { 1386 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1387 builder.setColumnFamily( 1388 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1389 } 1390 } 1391 if (splitRows != null) { 1392 getAdmin().createTable(builder.build(), splitRows); 1393 } else { 1394 getAdmin().createTable(builder.build()); 1395 } 1396 // HBaseAdmin only waits for regions to appear in hbase:meta 1397 // we should wait until they are assigned 1398 waitUntilAllRegionsAssigned(htd.getTableName()); 1399 return getConnection().getTable(htd.getTableName()); 1400 } 1401 1402 /** 1403 * Create a table. 1404 * @param tableName the table name 1405 * @param families the families 1406 * @param splitKeys the split keys 1407 * @param replicaCount the replica count 1408 * @param c Configuration to use 1409 * @return A Table instance for the created table. 1410 */ 1411 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1412 int replicaCount, final Configuration c) throws IOException { 1413 TableDescriptor htd = 1414 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1415 return createTable(htd, families, splitKeys, c); 1416 } 1417 1418 /** 1419 * Create a table. 1420 * @return A Table instance for the created table. 1421 */ 1422 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1423 return createTable(tableName, new byte[][] { family }, numVersions); 1424 } 1425 1426 /** 1427 * Create a table. 1428 * @return A Table instance for the created table. 1429 */ 1430 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1431 throws IOException { 1432 return createTable(tableName, families, numVersions, (byte[][]) null); 1433 } 1434 1435 /** 1436 * Create a table. 1437 * @return A Table instance for the created table. 1438 */ 1439 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1440 byte[][] splitKeys) throws IOException { 1441 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1442 for (byte[] family : families) { 1443 ColumnFamilyDescriptorBuilder cfBuilder = 1444 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1445 if (isNewVersionBehaviorEnabled()) { 1446 cfBuilder.setNewVersionBehavior(true); 1447 } 1448 builder.setColumnFamily(cfBuilder.build()); 1449 } 1450 if (splitKeys != null) { 1451 getAdmin().createTable(builder.build(), splitKeys); 1452 } else { 1453 getAdmin().createTable(builder.build()); 1454 } 1455 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1456 // assigned 1457 waitUntilAllRegionsAssigned(tableName); 1458 return getConnection().getTable(tableName); 1459 } 1460 1461 /** 1462 * Create a table with multiple regions. 1463 * @return A Table instance for the created table. 1464 */ 1465 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1466 throws IOException { 1467 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1468 } 1469 1470 /** 1471 * Create a table. 1472 * @return A Table instance for the created table. 1473 */ 1474 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1475 throws IOException { 1476 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1477 for (byte[] family : families) { 1478 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1479 .setMaxVersions(numVersions).setBlocksize(blockSize); 1480 if (isNewVersionBehaviorEnabled()) { 1481 cfBuilder.setNewVersionBehavior(true); 1482 } 1483 builder.setColumnFamily(cfBuilder.build()); 1484 } 1485 getAdmin().createTable(builder.build()); 1486 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1487 // assigned 1488 waitUntilAllRegionsAssigned(tableName); 1489 return getConnection().getTable(tableName); 1490 } 1491 1492 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1493 String cpName) throws IOException { 1494 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1495 for (byte[] family : families) { 1496 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1497 .setMaxVersions(numVersions).setBlocksize(blockSize); 1498 if (isNewVersionBehaviorEnabled()) { 1499 cfBuilder.setNewVersionBehavior(true); 1500 } 1501 builder.setColumnFamily(cfBuilder.build()); 1502 } 1503 if (cpName != null) { 1504 builder.setCoprocessor(cpName); 1505 } 1506 getAdmin().createTable(builder.build()); 1507 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1508 // assigned 1509 waitUntilAllRegionsAssigned(tableName); 1510 return getConnection().getTable(tableName); 1511 } 1512 1513 /** 1514 * Create a table. 1515 * @return A Table instance for the created table. 1516 */ 1517 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1518 throws IOException { 1519 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1520 int i = 0; 1521 for (byte[] family : families) { 1522 ColumnFamilyDescriptorBuilder cfBuilder = 1523 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1524 if (isNewVersionBehaviorEnabled()) { 1525 cfBuilder.setNewVersionBehavior(true); 1526 } 1527 builder.setColumnFamily(cfBuilder.build()); 1528 i++; 1529 } 1530 getAdmin().createTable(builder.build()); 1531 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1532 // assigned 1533 waitUntilAllRegionsAssigned(tableName); 1534 return getConnection().getTable(tableName); 1535 } 1536 1537 /** 1538 * Create a table. 1539 * @return A Table instance for the created table. 1540 */ 1541 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1542 throws IOException { 1543 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1544 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1545 if (isNewVersionBehaviorEnabled()) { 1546 cfBuilder.setNewVersionBehavior(true); 1547 } 1548 builder.setColumnFamily(cfBuilder.build()); 1549 getAdmin().createTable(builder.build(), splitRows); 1550 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1551 // assigned 1552 waitUntilAllRegionsAssigned(tableName); 1553 return getConnection().getTable(tableName); 1554 } 1555 1556 /** 1557 * Create a table with multiple regions. 1558 * @return A Table instance for the created table. 1559 */ 1560 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1561 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1562 } 1563 1564 /** 1565 * Set the number of Region replicas. 1566 */ 1567 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1568 throws IOException, InterruptedException { 1569 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1570 .setRegionReplication(replicaCount).build(); 1571 admin.modifyTable(desc); 1572 } 1573 1574 /** 1575 * Set the number of Region replicas. 1576 */ 1577 public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount) 1578 throws ExecutionException, IOException, InterruptedException { 1579 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get()) 1580 .setRegionReplication(replicaCount).build(); 1581 admin.modifyTable(desc).get(); 1582 } 1583 1584 /** 1585 * Drop an existing table 1586 * @param tableName existing table 1587 */ 1588 public void deleteTable(TableName tableName) throws IOException { 1589 try { 1590 getAdmin().disableTable(tableName); 1591 } catch (TableNotEnabledException e) { 1592 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1593 } 1594 getAdmin().deleteTable(tableName); 1595 } 1596 1597 /** 1598 * Drop an existing table 1599 * @param tableName existing table 1600 */ 1601 public void deleteTableIfAny(TableName tableName) throws IOException { 1602 try { 1603 deleteTable(tableName); 1604 } catch (TableNotFoundException e) { 1605 // ignore 1606 } 1607 } 1608 1609 // ========================================================================== 1610 // Canned table and table descriptor creation 1611 1612 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1613 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1614 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1615 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1616 private static final int MAXVERSIONS = 3; 1617 1618 public static final char FIRST_CHAR = 'a'; 1619 public static final char LAST_CHAR = 'z'; 1620 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1621 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1622 1623 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1624 return createModifyableTableDescriptor(TableName.valueOf(name), 1625 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1626 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1627 } 1628 1629 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1630 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1631 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1632 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1633 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1634 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1635 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1636 if (isNewVersionBehaviorEnabled()) { 1637 cfBuilder.setNewVersionBehavior(true); 1638 } 1639 builder.setColumnFamily(cfBuilder.build()); 1640 } 1641 return builder.build(); 1642 } 1643 1644 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1645 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1646 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1647 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1648 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1649 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1650 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1651 if (isNewVersionBehaviorEnabled()) { 1652 cfBuilder.setNewVersionBehavior(true); 1653 } 1654 builder.setColumnFamily(cfBuilder.build()); 1655 } 1656 return builder; 1657 } 1658 1659 /** 1660 * Create a table of name <code>name</code>. 1661 * @param name Name to give table. 1662 * @return Column descriptor. 1663 */ 1664 public TableDescriptor createTableDescriptor(final TableName name) { 1665 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1666 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1667 } 1668 1669 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1670 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1671 } 1672 1673 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1674 int maxVersions) { 1675 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1676 for (byte[] family : families) { 1677 ColumnFamilyDescriptorBuilder cfBuilder = 1678 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1679 if (isNewVersionBehaviorEnabled()) { 1680 cfBuilder.setNewVersionBehavior(true); 1681 } 1682 builder.setColumnFamily(cfBuilder.build()); 1683 } 1684 return builder.build(); 1685 } 1686 1687 /** 1688 * Create an HRegion that writes to the local tmp dirs 1689 * @param desc a table descriptor indicating which table the region belongs to 1690 * @param startKey the start boundary of the region 1691 * @param endKey the end boundary of the region 1692 * @return a region that writes to local dir for testing 1693 */ 1694 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1695 throws IOException { 1696 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1697 .setEndKey(endKey).build(); 1698 return createLocalHRegion(hri, desc); 1699 } 1700 1701 /** 1702 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1703 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it. 1704 */ 1705 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1706 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1707 } 1708 1709 /** 1710 * Create an HRegion that writes to the local tmp dirs with specified wal 1711 * @param info regioninfo 1712 * @param conf configuration 1713 * @param desc table descriptor 1714 * @param wal wal for this region. 1715 * @return created hregion 1716 */ 1717 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1718 WAL wal) throws IOException { 1719 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 1720 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1721 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1722 } 1723 1724 /** 1725 * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} 1726 * when done. 1727 */ 1728 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1729 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1730 throws IOException { 1731 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1732 durability, wal, null, families); 1733 } 1734 1735 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1736 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1737 boolean[] compactedMemStore, byte[]... families) throws IOException { 1738 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1739 builder.setReadOnly(isReadOnly); 1740 int i = 0; 1741 for (byte[] family : families) { 1742 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1743 if (compactedMemStore != null && i < compactedMemStore.length) { 1744 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1745 } else { 1746 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1747 1748 } 1749 i++; 1750 // Set default to be three versions. 1751 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1752 builder.setColumnFamily(cfBuilder.build()); 1753 } 1754 builder.setDurability(durability); 1755 RegionInfo info = 1756 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1757 return createLocalHRegion(info, conf, builder.build(), wal); 1758 } 1759 1760 // 1761 // ========================================================================== 1762 1763 /** 1764 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1765 * read. 1766 * @param tableName existing table 1767 * @return HTable to that new table 1768 */ 1769 public Table deleteTableData(TableName tableName) throws IOException { 1770 Table table = getConnection().getTable(tableName); 1771 Scan scan = new Scan(); 1772 ResultScanner resScan = table.getScanner(scan); 1773 for (Result res : resScan) { 1774 Delete del = new Delete(res.getRow()); 1775 table.delete(del); 1776 } 1777 resScan = table.getScanner(scan); 1778 resScan.close(); 1779 return table; 1780 } 1781 1782 /** 1783 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1784 * table. 1785 * @param tableName table which must exist. 1786 * @param preserveRegions keep the existing split points 1787 * @return HTable for the new table 1788 */ 1789 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 1790 throws IOException { 1791 Admin admin = getAdmin(); 1792 if (!admin.isTableDisabled(tableName)) { 1793 admin.disableTable(tableName); 1794 } 1795 admin.truncateTable(tableName, preserveRegions); 1796 return getConnection().getTable(tableName); 1797 } 1798 1799 /** 1800 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1801 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 1802 * preserve regions of existing table. 1803 * @param tableName table which must exist. 1804 * @return HTable for the new table 1805 */ 1806 public Table truncateTable(final TableName tableName) throws IOException { 1807 return truncateTable(tableName, false); 1808 } 1809 1810 /** 1811 * Load table with rows from 'aaa' to 'zzz'. 1812 * @param t Table 1813 * @param f Family 1814 * @return Count of rows loaded. 1815 */ 1816 public int loadTable(final Table t, final byte[] f) throws IOException { 1817 return loadTable(t, new byte[][] { f }); 1818 } 1819 1820 /** 1821 * Load table with rows from 'aaa' to 'zzz'. 1822 * @param t Table 1823 * @param f Family 1824 * @return Count of rows loaded. 1825 */ 1826 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 1827 return loadTable(t, new byte[][] { f }, null, writeToWAL); 1828 } 1829 1830 /** 1831 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1832 * @param t Table 1833 * @param f Array of Families to load 1834 * @return Count of rows loaded. 1835 */ 1836 public int loadTable(final Table t, final byte[][] f) throws IOException { 1837 return loadTable(t, f, null); 1838 } 1839 1840 /** 1841 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1842 * @param t Table 1843 * @param f Array of Families to load 1844 * @param value the values of the cells. If null is passed, the row key is used as value 1845 * @return Count of rows loaded. 1846 */ 1847 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 1848 return loadTable(t, f, value, true); 1849 } 1850 1851 /** 1852 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1853 * @param t Table 1854 * @param f Array of Families to load 1855 * @param value the values of the cells. If null is passed, the row key is used as value 1856 * @return Count of rows loaded. 1857 */ 1858 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 1859 throws IOException { 1860 List<Put> puts = new ArrayList<>(); 1861 for (byte[] row : HBaseTestingUtil.ROWS) { 1862 Put put = new Put(row); 1863 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 1864 for (int i = 0; i < f.length; i++) { 1865 byte[] value1 = value != null ? value : row; 1866 put.addColumn(f[i], f[i], value1); 1867 } 1868 puts.add(put); 1869 } 1870 t.put(puts); 1871 return puts.size(); 1872 } 1873 1874 /** 1875 * A tracker for tracking and validating table rows generated with 1876 * {@link HBaseTestingUtil#loadTable(Table, byte[])} 1877 */ 1878 public static class SeenRowTracker { 1879 int dim = 'z' - 'a' + 1; 1880 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 1881 byte[] startRow; 1882 byte[] stopRow; 1883 1884 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 1885 this.startRow = startRow; 1886 this.stopRow = stopRow; 1887 } 1888 1889 void reset() { 1890 for (byte[] row : ROWS) { 1891 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 1892 } 1893 } 1894 1895 int i(byte b) { 1896 return b - 'a'; 1897 } 1898 1899 public void addRow(byte[] row) { 1900 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 1901 } 1902 1903 /** 1904 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 1905 * rows none 1906 */ 1907 public void validate() { 1908 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1909 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1910 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1911 int count = seenRows[i(b1)][i(b2)][i(b3)]; 1912 int expectedCount = 0; 1913 if ( 1914 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 1915 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 1916 ) { 1917 expectedCount = 1; 1918 } 1919 if (count != expectedCount) { 1920 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 1921 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 1922 + "instead of " + expectedCount); 1923 } 1924 } 1925 } 1926 } 1927 } 1928 } 1929 1930 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 1931 return loadRegion(r, f, false); 1932 } 1933 1934 public int loadRegion(final Region r, final byte[] f) throws IOException { 1935 return loadRegion((HRegion) r, f); 1936 } 1937 1938 /** 1939 * Load region with rows from 'aaa' to 'zzz'. 1940 * @param r Region 1941 * @param f Family 1942 * @param flush flush the cache if true 1943 * @return Count of rows loaded. 1944 */ 1945 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 1946 byte[] k = new byte[3]; 1947 int rowCount = 0; 1948 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1949 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1950 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1951 k[0] = b1; 1952 k[1] = b2; 1953 k[2] = b3; 1954 Put put = new Put(k); 1955 put.setDurability(Durability.SKIP_WAL); 1956 put.addColumn(f, null, k); 1957 if (r.getWAL() == null) { 1958 put.setDurability(Durability.SKIP_WAL); 1959 } 1960 int preRowCount = rowCount; 1961 int pause = 10; 1962 int maxPause = 1000; 1963 while (rowCount == preRowCount) { 1964 try { 1965 r.put(put); 1966 rowCount++; 1967 } catch (RegionTooBusyException e) { 1968 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 1969 Threads.sleep(pause); 1970 } 1971 } 1972 } 1973 } 1974 if (flush) { 1975 r.flush(true); 1976 } 1977 } 1978 return rowCount; 1979 } 1980 1981 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 1982 throws IOException { 1983 for (int i = startRow; i < endRow; i++) { 1984 byte[] data = Bytes.toBytes(String.valueOf(i)); 1985 Put put = new Put(data); 1986 put.addColumn(f, null, data); 1987 t.put(put); 1988 } 1989 } 1990 1991 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 1992 throws IOException { 1993 for (int i = 0; i < totalRows; i++) { 1994 byte[] row = new byte[rowSize]; 1995 Bytes.random(row); 1996 Put put = new Put(row); 1997 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 1998 t.put(put); 1999 } 2000 } 2001 2002 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 2003 int replicaId) throws IOException { 2004 for (int i = startRow; i < endRow; i++) { 2005 String failMsg = "Failed verification of row :" + i; 2006 byte[] data = Bytes.toBytes(String.valueOf(i)); 2007 Get get = new Get(data); 2008 get.setReplicaId(replicaId); 2009 get.setConsistency(Consistency.TIMELINE); 2010 Result result = table.get(get); 2011 assertTrue(failMsg, result.containsColumn(f, null)); 2012 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2013 Cell cell = result.getColumnLatestCell(f, null); 2014 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2015 cell.getValueOffset(), cell.getValueLength())); 2016 } 2017 } 2018 2019 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 2020 throws IOException { 2021 verifyNumericRows((HRegion) region, f, startRow, endRow); 2022 } 2023 2024 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 2025 throws IOException { 2026 verifyNumericRows(region, f, startRow, endRow, true); 2027 } 2028 2029 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 2030 final boolean present) throws IOException { 2031 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 2032 } 2033 2034 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 2035 final boolean present) throws IOException { 2036 for (int i = startRow; i < endRow; i++) { 2037 String failMsg = "Failed verification of row :" + i; 2038 byte[] data = Bytes.toBytes(String.valueOf(i)); 2039 Result result = region.get(new Get(data)); 2040 2041 boolean hasResult = result != null && !result.isEmpty(); 2042 assertEquals(failMsg + result, present, hasResult); 2043 if (!present) continue; 2044 2045 assertTrue(failMsg, result.containsColumn(f, null)); 2046 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2047 Cell cell = result.getColumnLatestCell(f, null); 2048 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2049 cell.getValueOffset(), cell.getValueLength())); 2050 } 2051 } 2052 2053 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2054 throws IOException { 2055 for (int i = startRow; i < endRow; i++) { 2056 byte[] data = Bytes.toBytes(String.valueOf(i)); 2057 Delete delete = new Delete(data); 2058 delete.addFamily(f); 2059 t.delete(delete); 2060 } 2061 } 2062 2063 /** 2064 * Return the number of rows in the given table. 2065 * @param table to count rows 2066 * @return count of rows 2067 */ 2068 public static int countRows(final Table table) throws IOException { 2069 return countRows(table, new Scan()); 2070 } 2071 2072 public static int countRows(final Table table, final Scan scan) throws IOException { 2073 try (ResultScanner results = table.getScanner(scan)) { 2074 int count = 0; 2075 while (results.next() != null) { 2076 count++; 2077 } 2078 return count; 2079 } 2080 } 2081 2082 public static int countRows(final Table table, final byte[]... families) throws IOException { 2083 Scan scan = new Scan(); 2084 for (byte[] family : families) { 2085 scan.addFamily(family); 2086 } 2087 return countRows(table, scan); 2088 } 2089 2090 /** 2091 * Return the number of rows in the given table. 2092 */ 2093 public int countRows(final TableName tableName) throws IOException { 2094 try (Table table = getConnection().getTable(tableName)) { 2095 return countRows(table); 2096 } 2097 } 2098 2099 public static int countRows(final Region region) throws IOException { 2100 return countRows(region, new Scan()); 2101 } 2102 2103 public static int countRows(final Region region, final Scan scan) throws IOException { 2104 try (InternalScanner scanner = region.getScanner(scan)) { 2105 return countRows(scanner); 2106 } 2107 } 2108 2109 public static int countRows(final InternalScanner scanner) throws IOException { 2110 int scannedCount = 0; 2111 List<Cell> results = new ArrayList<>(); 2112 boolean hasMore = true; 2113 while (hasMore) { 2114 hasMore = scanner.next(results); 2115 scannedCount += results.size(); 2116 results.clear(); 2117 } 2118 return scannedCount; 2119 } 2120 2121 /** 2122 * Return an md5 digest of the entire contents of a table. 2123 */ 2124 public String checksumRows(final Table table) throws Exception { 2125 MessageDigest digest = MessageDigest.getInstance("MD5"); 2126 try (ResultScanner results = table.getScanner(new Scan())) { 2127 for (Result res : results) { 2128 digest.update(res.getRow()); 2129 } 2130 } 2131 return digest.toString(); 2132 } 2133 2134 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2135 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2136 static { 2137 int i = 0; 2138 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2139 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2140 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2141 ROWS[i][0] = b1; 2142 ROWS[i][1] = b2; 2143 ROWS[i][2] = b3; 2144 i++; 2145 } 2146 } 2147 } 2148 } 2149 2150 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2151 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2152 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2153 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2154 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2155 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2156 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2157 2158 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2159 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2160 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2161 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2162 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2163 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2164 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2165 2166 /** 2167 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2168 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2169 * @return list of region info for regions added to meta 2170 */ 2171 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2172 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2173 try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 2174 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2175 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2176 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2177 TableState.State.ENABLED); 2178 // add custom ones 2179 for (int i = 0; i < startKeys.length; i++) { 2180 int j = (i + 1) % startKeys.length; 2181 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2182 .setEndKey(startKeys[j]).build(); 2183 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2184 newRegions.add(hri); 2185 } 2186 return newRegions; 2187 } 2188 } 2189 2190 /** 2191 * Create an unmanaged WAL. Be sure to close it when you're through. 2192 */ 2193 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2194 throws IOException { 2195 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2196 // unless I pass along via the conf. 2197 Configuration confForWAL = new Configuration(conf); 2198 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 2199 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri); 2200 } 2201 2202 /** 2203 * Create a region with it's own WAL. Be sure to call 2204 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2205 */ 2206 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2207 final Configuration conf, final TableDescriptor htd) throws IOException { 2208 return createRegionAndWAL(info, rootDir, conf, htd, true); 2209 } 2210 2211 /** 2212 * Create a region with it's own WAL. Be sure to call 2213 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2214 */ 2215 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2216 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2217 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2218 region.setBlockCache(blockCache); 2219 region.initialize(); 2220 return region; 2221 } 2222 2223 /** 2224 * Create a region with it's own WAL. Be sure to call 2225 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2226 */ 2227 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2228 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2229 throws IOException { 2230 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2231 region.setMobFileCache(mobFileCache); 2232 region.initialize(); 2233 return region; 2234 } 2235 2236 /** 2237 * Create a region with it's own WAL. Be sure to call 2238 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2239 */ 2240 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2241 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2242 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2243 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2244 WAL wal = createWal(conf, rootDir, info); 2245 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2246 } 2247 2248 /** 2249 * Find any other region server which is different from the one identified by parameter 2250 * @return another region server 2251 */ 2252 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2253 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2254 if (!(rst.getRegionServer() == rs)) { 2255 return rst.getRegionServer(); 2256 } 2257 } 2258 return null; 2259 } 2260 2261 /** 2262 * Tool to get the reference to the region server object that holds the region of the specified 2263 * user table. 2264 * @param tableName user table to lookup in hbase:meta 2265 * @return region server that holds it, null if the row doesn't exist 2266 */ 2267 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2268 throws IOException, InterruptedException { 2269 List<RegionInfo> regions = getAdmin().getRegions(tableName); 2270 if (regions == null || regions.isEmpty()) { 2271 return null; 2272 } 2273 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2274 2275 byte[] firstRegionName = 2276 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2277 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2278 2279 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2280 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2281 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2282 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2283 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2284 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2285 while (retrier.shouldRetry()) { 2286 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2287 if (index != -1) { 2288 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2289 } 2290 // Came back -1. Region may not be online yet. Sleep a while. 2291 retrier.sleepUntilNextRetry(); 2292 } 2293 return null; 2294 } 2295 2296 /** 2297 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2298 * @throws IOException When starting the cluster fails. 2299 */ 2300 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2301 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2302 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2303 "99.0"); 2304 startMiniMapReduceCluster(2); 2305 return mrCluster; 2306 } 2307 2308 /** 2309 * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its 2310 * internal static LOG_DIR variable. 2311 */ 2312 private void forceChangeTaskLogDir() { 2313 Field logDirField; 2314 try { 2315 logDirField = TaskLog.class.getDeclaredField("LOG_DIR"); 2316 logDirField.setAccessible(true); 2317 2318 Field modifiersField = ReflectionUtils.getModifiersField(); 2319 modifiersField.setAccessible(true); 2320 modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL); 2321 2322 logDirField.set(null, new File(hadoopLogDir, "userlogs")); 2323 } catch (SecurityException e) { 2324 throw new RuntimeException(e); 2325 } catch (NoSuchFieldException e) { 2326 throw new RuntimeException(e); 2327 } catch (IllegalArgumentException e) { 2328 throw new RuntimeException(e); 2329 } catch (IllegalAccessException e) { 2330 throw new RuntimeException(e); 2331 } 2332 } 2333 2334 /** 2335 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2336 * filesystem. 2337 * @param servers The number of <code>TaskTracker</code>'s to start. 2338 * @throws IOException When starting the cluster fails. 2339 */ 2340 private void startMiniMapReduceCluster(final int servers) throws IOException { 2341 if (mrCluster != null) { 2342 throw new IllegalStateException("MiniMRCluster is already running"); 2343 } 2344 LOG.info("Starting mini mapreduce cluster..."); 2345 setupClusterTestDir(); 2346 createDirsAndSetProperties(); 2347 2348 forceChangeTaskLogDir(); 2349 2350 //// hadoop2 specific settings 2351 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2352 // we up the VM usable so that processes don't get killed. 2353 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2354 2355 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2356 // this avoids the problem by disabling speculative task execution in tests. 2357 conf.setBoolean("mapreduce.map.speculative", false); 2358 conf.setBoolean("mapreduce.reduce.speculative", false); 2359 //// 2360 2361 // Yarn container runs in independent JVM. We need to pass the argument manually here if the 2362 // JDK version >= 17. Otherwise, the MiniMRCluster will fail. 2363 if (JVM.getJVMSpecVersion() >= 17) { 2364 String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", ""); 2365 conf.set("yarn.app.mapreduce.am.command-opts", 2366 jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED"); 2367 } 2368 2369 // Allow the user to override FS URI for this map-reduce cluster to use. 2370 mrCluster = 2371 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2372 1, null, null, new JobConf(this.conf)); 2373 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2374 if (jobConf == null) { 2375 jobConf = mrCluster.createJobConf(); 2376 } 2377 2378 // Hadoop MiniMR overwrites this while it should not 2379 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2380 LOG.info("Mini mapreduce cluster started"); 2381 2382 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2383 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2384 // necessary config properties here. YARN-129 required adding a few properties. 2385 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2386 // this for mrv2 support; mr1 ignores this 2387 conf.set("mapreduce.framework.name", "yarn"); 2388 conf.setBoolean("yarn.is.minicluster", true); 2389 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2390 if (rmAddress != null) { 2391 conf.set("yarn.resourcemanager.address", rmAddress); 2392 } 2393 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2394 if (historyAddress != null) { 2395 conf.set("mapreduce.jobhistory.address", historyAddress); 2396 } 2397 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2398 if (schedulerAddress != null) { 2399 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2400 } 2401 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2402 if (mrJobHistoryWebappAddress != null) { 2403 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2404 } 2405 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2406 if (yarnRMWebappAddress != null) { 2407 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2408 } 2409 } 2410 2411 /** 2412 * Stops the previously started <code>MiniMRCluster</code>. 2413 */ 2414 public void shutdownMiniMapReduceCluster() { 2415 if (mrCluster != null) { 2416 LOG.info("Stopping mini mapreduce cluster..."); 2417 mrCluster.shutdown(); 2418 mrCluster = null; 2419 LOG.info("Mini mapreduce cluster stopped"); 2420 } 2421 // Restore configuration to point to local jobtracker 2422 conf.set("mapreduce.jobtracker.address", "local"); 2423 } 2424 2425 /** 2426 * Create a stubbed out RegionServerService, mainly for getting FS. 2427 */ 2428 public RegionServerServices createMockRegionServerService() throws IOException { 2429 return createMockRegionServerService((ServerName) null); 2430 } 2431 2432 /** 2433 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2434 * TestTokenAuthentication 2435 */ 2436 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2437 throws IOException { 2438 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2439 rss.setFileSystem(getTestFileSystem()); 2440 rss.setRpcServer(rpc); 2441 return rss; 2442 } 2443 2444 /** 2445 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2446 * TestOpenRegionHandler 2447 */ 2448 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2449 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2450 rss.setFileSystem(getTestFileSystem()); 2451 return rss; 2452 } 2453 2454 /** 2455 * Expire the Master's session 2456 */ 2457 public void expireMasterSession() throws Exception { 2458 HMaster master = getMiniHBaseCluster().getMaster(); 2459 expireSession(master.getZooKeeper(), false); 2460 } 2461 2462 /** 2463 * Expire a region server's session 2464 * @param index which RS 2465 */ 2466 public void expireRegionServerSession(int index) throws Exception { 2467 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2468 expireSession(rs.getZooKeeper(), false); 2469 decrementMinRegionServerCount(); 2470 } 2471 2472 private void decrementMinRegionServerCount() { 2473 // decrement the count for this.conf, for newly spwaned master 2474 // this.hbaseCluster shares this configuration too 2475 decrementMinRegionServerCount(getConfiguration()); 2476 2477 // each master thread keeps a copy of configuration 2478 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2479 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2480 } 2481 } 2482 2483 private void decrementMinRegionServerCount(Configuration conf) { 2484 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2485 if (currentCount != -1) { 2486 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2487 } 2488 } 2489 2490 public void expireSession(ZKWatcher nodeZK) throws Exception { 2491 expireSession(nodeZK, false); 2492 } 2493 2494 /** 2495 * Expire a ZooKeeper session as recommended in ZooKeeper documentation 2496 * http://hbase.apache.org/book.html#trouble.zookeeper 2497 * <p/> 2498 * There are issues when doing this: 2499 * <ol> 2500 * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li> 2501 * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li> 2502 * </ol> 2503 * @param nodeZK - the ZK watcher to expire 2504 * @param checkStatus - true to check if we can create a Table with the current configuration. 2505 */ 2506 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2507 Configuration c = new Configuration(this.conf); 2508 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2509 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2510 byte[] password = zk.getSessionPasswd(); 2511 long sessionID = zk.getSessionId(); 2512 2513 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2514 // so we create a first watcher to be sure that the 2515 // event was sent. We expect that if our watcher receives the event 2516 // other watchers on the same machine will get is as well. 2517 // When we ask to close the connection, ZK does not close it before 2518 // we receive all the events, so don't have to capture the event, just 2519 // closing the connection should be enough. 2520 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2521 @Override 2522 public void process(WatchedEvent watchedEvent) { 2523 LOG.info("Monitor ZKW received event=" + watchedEvent); 2524 } 2525 }, sessionID, password); 2526 2527 // Making it expire 2528 ZooKeeper newZK = 2529 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2530 2531 // ensure that we have connection to the server before closing down, otherwise 2532 // the close session event will be eaten out before we start CONNECTING state 2533 long start = EnvironmentEdgeManager.currentTime(); 2534 while ( 2535 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2536 ) { 2537 Thread.sleep(1); 2538 } 2539 newZK.close(); 2540 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2541 2542 // Now closing & waiting to be sure that the clients get it. 2543 monitor.close(); 2544 2545 if (checkStatus) { 2546 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2547 } 2548 } 2549 2550 /** 2551 * Get the Mini HBase cluster. 2552 * @return hbase cluster 2553 * @see #getHBaseClusterInterface() 2554 */ 2555 public SingleProcessHBaseCluster getHBaseCluster() { 2556 return getMiniHBaseCluster(); 2557 } 2558 2559 /** 2560 * Returns the HBaseCluster instance. 2561 * <p> 2562 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2563 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2564 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2565 * instead w/o the need to type-cast. 2566 */ 2567 public HBaseClusterInterface getHBaseClusterInterface() { 2568 // implementation note: we should rename this method as #getHBaseCluster(), 2569 // but this would require refactoring 90+ calls. 2570 return hbaseCluster; 2571 } 2572 2573 /** 2574 * Resets the connections so that the next time getConnection() is called, a new connection is 2575 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2576 * the connection is not valid anymore. 2577 * <p/> 2578 * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are 2579 * written, not all start() stop() calls go through this class. Most tests directly operate on the 2580 * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain 2581 * the connection state automatically. Cleaning this is a much bigger refactor. 2582 */ 2583 public void invalidateConnection() throws IOException { 2584 closeConnection(); 2585 // Update the master addresses if they changed. 2586 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2587 final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY); 2588 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2589 masterConfigBefore, masterConfAfter); 2590 conf.set(HConstants.MASTER_ADDRS_KEY, 2591 getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY)); 2592 } 2593 2594 /** 2595 * Get a shared Connection to the cluster. this method is thread safe. 2596 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2597 */ 2598 public Connection getConnection() throws IOException { 2599 return getAsyncConnection().toConnection(); 2600 } 2601 2602 /** 2603 * Get a assigned Connection to the cluster. this method is thread safe. 2604 * @param user assigned user 2605 * @return A Connection with assigned user. 2606 */ 2607 public Connection getConnection(User user) throws IOException { 2608 return getAsyncConnection(user).toConnection(); 2609 } 2610 2611 /** 2612 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2613 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2614 * of cluster. 2615 */ 2616 public AsyncClusterConnection getAsyncConnection() throws IOException { 2617 try { 2618 return asyncConnection.updateAndGet(connection -> { 2619 if (connection == null) { 2620 try { 2621 User user = UserProvider.instantiate(conf).getCurrent(); 2622 connection = getAsyncConnection(user); 2623 } catch (IOException ioe) { 2624 throw new UncheckedIOException("Failed to create connection", ioe); 2625 } 2626 } 2627 return connection; 2628 }); 2629 } catch (UncheckedIOException exception) { 2630 throw exception.getCause(); 2631 } 2632 } 2633 2634 /** 2635 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2636 * @param user assigned user 2637 * @return An AsyncClusterConnection with assigned user. 2638 */ 2639 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2640 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2641 } 2642 2643 public void closeConnection() throws IOException { 2644 if (hbaseAdmin != null) { 2645 Closeables.close(hbaseAdmin, true); 2646 hbaseAdmin = null; 2647 } 2648 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2649 if (asyncConnection != null) { 2650 Closeables.close(asyncConnection, true); 2651 } 2652 } 2653 2654 /** 2655 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2656 * it has no effect, it will be closed automatically when the cluster shutdowns 2657 */ 2658 public Admin getAdmin() throws IOException { 2659 if (hbaseAdmin == null) { 2660 this.hbaseAdmin = getConnection().getAdmin(); 2661 } 2662 return hbaseAdmin; 2663 } 2664 2665 private Admin hbaseAdmin = null; 2666 2667 /** 2668 * Returns an {@link Hbck} instance. Needs be closed when done. 2669 */ 2670 public Hbck getHbck() throws IOException { 2671 return getConnection().getHbck(); 2672 } 2673 2674 /** 2675 * Unassign the named region. 2676 * @param regionName The region to unassign. 2677 */ 2678 public void unassignRegion(String regionName) throws IOException { 2679 unassignRegion(Bytes.toBytes(regionName)); 2680 } 2681 2682 /** 2683 * Unassign the named region. 2684 * @param regionName The region to unassign. 2685 */ 2686 public void unassignRegion(byte[] regionName) throws IOException { 2687 getAdmin().unassign(regionName); 2688 } 2689 2690 /** 2691 * Closes the region containing the given row. 2692 * @param row The row to find the containing region. 2693 * @param table The table to find the region. 2694 */ 2695 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 2696 unassignRegionByRow(Bytes.toBytes(row), table); 2697 } 2698 2699 /** 2700 * Closes the region containing the given row. 2701 * @param row The row to find the containing region. 2702 * @param table The table to find the region. 2703 */ 2704 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 2705 HRegionLocation hrl = table.getRegionLocation(row); 2706 unassignRegion(hrl.getRegion().getRegionName()); 2707 } 2708 2709 /** 2710 * Retrieves a splittable region randomly from tableName 2711 * @param tableName name of table 2712 * @param maxAttempts maximum number of attempts, unlimited for value of -1 2713 * @return the HRegion chosen, null if none was found within limit of maxAttempts 2714 */ 2715 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 2716 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 2717 int regCount = regions.size(); 2718 Set<Integer> attempted = new HashSet<>(); 2719 int idx; 2720 int attempts = 0; 2721 do { 2722 regions = getHBaseCluster().getRegions(tableName); 2723 if (regCount != regions.size()) { 2724 // if there was region movement, clear attempted Set 2725 attempted.clear(); 2726 } 2727 regCount = regions.size(); 2728 // There are chances that before we get the region for the table from an RS the region may 2729 // be going for CLOSE. This may be because online schema change is enabled 2730 if (regCount > 0) { 2731 idx = ThreadLocalRandom.current().nextInt(regCount); 2732 // if we have just tried this region, there is no need to try again 2733 if (attempted.contains(idx)) { 2734 continue; 2735 } 2736 HRegion region = regions.get(idx); 2737 if (region.checkSplit().isPresent()) { 2738 return region; 2739 } 2740 attempted.add(idx); 2741 } 2742 attempts++; 2743 } while (maxAttempts == -1 || attempts < maxAttempts); 2744 return null; 2745 } 2746 2747 public MiniDFSCluster getDFSCluster() { 2748 return dfsCluster; 2749 } 2750 2751 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 2752 setDFSCluster(cluster, true); 2753 } 2754 2755 /** 2756 * Set the MiniDFSCluster 2757 * @param cluster cluster to use 2758 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 2759 * is set. 2760 * @throws IllegalStateException if the passed cluster is up when it is required to be down 2761 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 2762 */ 2763 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 2764 throws IllegalStateException, IOException { 2765 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 2766 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 2767 } 2768 this.dfsCluster = cluster; 2769 this.setFs(); 2770 } 2771 2772 public FileSystem getTestFileSystem() throws IOException { 2773 return HFileSystem.get(conf); 2774 } 2775 2776 /** 2777 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 2778 * (30 seconds). 2779 * @param table Table to wait on. 2780 */ 2781 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 2782 waitTableAvailable(table.getName(), 30000); 2783 } 2784 2785 public void waitTableAvailable(TableName table, long timeoutMillis) 2786 throws InterruptedException, IOException { 2787 waitFor(timeoutMillis, predicateTableAvailable(table)); 2788 } 2789 2790 /** 2791 * Wait until all regions in a table have been assigned 2792 * @param table Table to wait on. 2793 * @param timeoutMillis Timeout. 2794 */ 2795 public void waitTableAvailable(byte[] table, long timeoutMillis) 2796 throws InterruptedException, IOException { 2797 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 2798 } 2799 2800 public String explainTableAvailability(TableName tableName) throws IOException { 2801 StringBuilder msg = 2802 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 2803 if (getHBaseCluster().getMaster().isAlive()) { 2804 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 2805 .getRegionStates().getRegionAssignments(); 2806 final List<Pair<RegionInfo, ServerName>> metaLocations = 2807 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 2808 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 2809 RegionInfo hri = metaLocation.getFirst(); 2810 ServerName sn = metaLocation.getSecond(); 2811 if (!assignments.containsKey(hri)) { 2812 msg.append(", region ").append(hri) 2813 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 2814 } else if (sn == null) { 2815 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 2816 } else if (!sn.equals(assignments.get(hri))) { 2817 msg.append(", region ").append(hri) 2818 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 2819 .append(" <> ").append(assignments.get(hri)); 2820 } 2821 } 2822 } 2823 return msg.toString(); 2824 } 2825 2826 public String explainTableState(final TableName table, TableState.State state) 2827 throws IOException { 2828 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 2829 if (tableState == null) { 2830 return "TableState in META: No table state in META for table " + table 2831 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 2832 } else if (!tableState.inStates(state)) { 2833 return "TableState in META: Not " + state + " state, but " + tableState; 2834 } else { 2835 return "TableState in META: OK"; 2836 } 2837 } 2838 2839 @Nullable 2840 public TableState findLastTableState(final TableName table) throws IOException { 2841 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 2842 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 2843 @Override 2844 public boolean visit(Result r) throws IOException { 2845 if (!Arrays.equals(r.getRow(), table.getName())) { 2846 return false; 2847 } 2848 TableState state = CatalogFamilyFormat.getTableState(r); 2849 if (state != null) { 2850 lastTableState.set(state); 2851 } 2852 return true; 2853 } 2854 }; 2855 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 2856 Integer.MAX_VALUE, visitor); 2857 return lastTableState.get(); 2858 } 2859 2860 /** 2861 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2862 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 2863 * table. 2864 * @param table the table to wait on. 2865 * @throws InterruptedException if interrupted while waiting 2866 * @throws IOException if an IO problem is encountered 2867 */ 2868 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 2869 waitTableEnabled(table, 30000); 2870 } 2871 2872 /** 2873 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2874 * have been all assigned. 2875 * @see #waitTableEnabled(TableName, long) 2876 * @param table Table to wait on. 2877 * @param timeoutMillis Time to wait on it being marked enabled. 2878 */ 2879 public void waitTableEnabled(byte[] table, long timeoutMillis) 2880 throws InterruptedException, IOException { 2881 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 2882 } 2883 2884 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 2885 waitFor(timeoutMillis, predicateTableEnabled(table)); 2886 } 2887 2888 /** 2889 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 2890 * after default period (30 seconds) 2891 * @param table Table to wait on. 2892 */ 2893 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 2894 waitTableDisabled(table, 30000); 2895 } 2896 2897 public void waitTableDisabled(TableName table, long millisTimeout) 2898 throws InterruptedException, IOException { 2899 waitFor(millisTimeout, predicateTableDisabled(table)); 2900 } 2901 2902 /** 2903 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 2904 * @param table Table to wait on. 2905 * @param timeoutMillis Time to wait on it being marked disabled. 2906 */ 2907 public void waitTableDisabled(byte[] table, long timeoutMillis) 2908 throws InterruptedException, IOException { 2909 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 2910 } 2911 2912 /** 2913 * Make sure that at least the specified number of region servers are running 2914 * @param num minimum number of region servers that should be running 2915 * @return true if we started some servers 2916 */ 2917 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 2918 boolean startedServer = false; 2919 SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster(); 2920 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 2921 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 2922 startedServer = true; 2923 } 2924 2925 return startedServer; 2926 } 2927 2928 /** 2929 * Make sure that at least the specified number of region servers are running. We don't count the 2930 * ones that are currently stopping or are stopped. 2931 * @param num minimum number of region servers that should be running 2932 * @return true if we started some servers 2933 */ 2934 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 2935 boolean startedServer = ensureSomeRegionServersAvailable(num); 2936 2937 int nonStoppedServers = 0; 2938 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2939 2940 HRegionServer hrs = rst.getRegionServer(); 2941 if (hrs.isStopping() || hrs.isStopped()) { 2942 LOG.info("A region server is stopped or stopping:" + hrs); 2943 } else { 2944 nonStoppedServers++; 2945 } 2946 } 2947 for (int i = nonStoppedServers; i < num; ++i) { 2948 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 2949 startedServer = true; 2950 } 2951 return startedServer; 2952 } 2953 2954 /** 2955 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 2956 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 2957 * @param c Initial configuration 2958 * @param differentiatingSuffix Suffix to differentiate this user from others. 2959 * @return A new configuration instance with a different user set into it. 2960 */ 2961 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 2962 throws IOException { 2963 FileSystem currentfs = FileSystem.get(c); 2964 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 2965 return User.getCurrent(); 2966 } 2967 // Else distributed filesystem. Make a new instance per daemon. Below 2968 // code is taken from the AppendTestUtil over in hdfs. 2969 String username = User.getCurrent().getName() + differentiatingSuffix; 2970 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 2971 return user; 2972 } 2973 2974 public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster) 2975 throws IOException { 2976 NavigableSet<String> online = new TreeSet<>(); 2977 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 2978 try { 2979 for (RegionInfo region : ProtobufUtil 2980 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 2981 online.add(region.getRegionNameAsString()); 2982 } 2983 } catch (RegionServerStoppedException e) { 2984 // That's fine. 2985 } 2986 } 2987 return online; 2988 } 2989 2990 /** 2991 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 2992 * linger. Here is the exception you'll see: 2993 * 2994 * <pre> 2995 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 2996 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 2997 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 2998 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 2999 * </pre> 3000 * 3001 * @param stream A DFSClient.DFSOutputStream. 3002 */ 3003 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 3004 try { 3005 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 3006 for (Class<?> clazz : clazzes) { 3007 String className = clazz.getSimpleName(); 3008 if (className.equals("DFSOutputStream")) { 3009 if (clazz.isInstance(stream)) { 3010 Field maxRecoveryErrorCountField = 3011 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 3012 maxRecoveryErrorCountField.setAccessible(true); 3013 maxRecoveryErrorCountField.setInt(stream, max); 3014 break; 3015 } 3016 } 3017 } 3018 } catch (Exception e) { 3019 LOG.info("Could not set max recovery field", e); 3020 } 3021 } 3022 3023 /** 3024 * Uses directly the assignment manager to assign the region. and waits until the specified region 3025 * has completed assignment. 3026 * @return true if the region is assigned false otherwise. 3027 */ 3028 public boolean assignRegion(final RegionInfo regionInfo) 3029 throws IOException, InterruptedException { 3030 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 3031 am.assign(regionInfo); 3032 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 3033 } 3034 3035 /** 3036 * Move region to destination server and wait till region is completely moved and online 3037 * @param destRegion region to move 3038 * @param destServer destination server of the region 3039 */ 3040 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3041 throws InterruptedException, IOException { 3042 HMaster master = getMiniHBaseCluster().getMaster(); 3043 // TODO: Here we start the move. The move can take a while. 3044 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3045 while (true) { 3046 ServerName serverName = 3047 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3048 if (serverName != null && serverName.equals(destServer)) { 3049 assertRegionOnServer(destRegion, serverName, 2000); 3050 break; 3051 } 3052 Thread.sleep(10); 3053 } 3054 } 3055 3056 /** 3057 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3058 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3059 * master has been informed and updated hbase:meta with the regions deployed server. 3060 * @param tableName the table name 3061 */ 3062 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3063 waitUntilAllRegionsAssigned(tableName, 3064 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3065 } 3066 3067 /** 3068 * Waith until all system table's regions get assigned 3069 */ 3070 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3071 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3072 } 3073 3074 /** 3075 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3076 * timeout. This means all regions have been deployed, master has been informed and updated 3077 * hbase:meta with the regions deployed server. 3078 * @param tableName the table name 3079 * @param timeout timeout, in milliseconds 3080 */ 3081 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3082 throws IOException { 3083 if (!TableName.isMetaTableName(tableName)) { 3084 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3085 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3086 + timeout + "ms"); 3087 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3088 @Override 3089 public String explainFailure() throws IOException { 3090 return explainTableAvailability(tableName); 3091 } 3092 3093 @Override 3094 public boolean evaluate() throws IOException { 3095 Scan scan = new Scan(); 3096 scan.addFamily(HConstants.CATALOG_FAMILY); 3097 boolean tableFound = false; 3098 try (ResultScanner s = meta.getScanner(scan)) { 3099 for (Result r; (r = s.next()) != null;) { 3100 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3101 RegionInfo info = RegionInfo.parseFromOrNull(b); 3102 if (info != null && info.getTable().equals(tableName)) { 3103 // Get server hosting this region from catalog family. Return false if no server 3104 // hosting this region, or if the server hosting this region was recently killed 3105 // (for fault tolerance testing). 3106 tableFound = true; 3107 byte[] server = 3108 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3109 if (server == null) { 3110 return false; 3111 } else { 3112 byte[] startCode = 3113 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3114 ServerName serverName = 3115 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3116 + Bytes.toLong(startCode)); 3117 if ( 3118 !getHBaseClusterInterface().isDistributedCluster() 3119 && getHBaseCluster().isKilledRS(serverName) 3120 ) { 3121 return false; 3122 } 3123 } 3124 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3125 return false; 3126 } 3127 } 3128 } 3129 } 3130 if (!tableFound) { 3131 LOG.warn( 3132 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3133 } 3134 return tableFound; 3135 } 3136 }); 3137 } 3138 } 3139 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3140 // check from the master state if we are using a mini cluster 3141 if (!getHBaseClusterInterface().isDistributedCluster()) { 3142 // So, all regions are in the meta table but make sure master knows of the assignments before 3143 // returning -- sometimes this can lag. 3144 HMaster master = getHBaseCluster().getMaster(); 3145 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3146 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3147 @Override 3148 public String explainFailure() throws IOException { 3149 return explainTableAvailability(tableName); 3150 } 3151 3152 @Override 3153 public boolean evaluate() throws IOException { 3154 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3155 return hris != null && !hris.isEmpty(); 3156 } 3157 }); 3158 } 3159 LOG.info("All regions for table " + tableName + " assigned."); 3160 } 3161 3162 /** 3163 * Do a small get/scan against one store. This is required because store has no actual methods of 3164 * querying itself, and relies on StoreScanner. 3165 */ 3166 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3167 Scan scan = new Scan(get); 3168 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3169 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3170 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3171 // readpoint 0. 3172 0); 3173 3174 List<Cell> result = new ArrayList<>(); 3175 scanner.next(result); 3176 if (!result.isEmpty()) { 3177 // verify that we are on the row we want: 3178 Cell kv = result.get(0); 3179 if (!CellUtil.matchingRows(kv, get.getRow())) { 3180 result.clear(); 3181 } 3182 } 3183 scanner.close(); 3184 return result; 3185 } 3186 3187 /** 3188 * Create region split keys between startkey and endKey 3189 * @param numRegions the number of regions to be created. it has to be greater than 3. 3190 * @return resulting split keys 3191 */ 3192 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3193 assertTrue(numRegions > 3); 3194 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3195 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3196 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3197 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3198 return result; 3199 } 3200 3201 /** 3202 * Do a small get/scan against one store. This is required because store has no actual methods of 3203 * querying itself, and relies on StoreScanner. 3204 */ 3205 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3206 throws IOException { 3207 Get get = new Get(row); 3208 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3209 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3210 3211 return getFromStoreFile(store, get); 3212 } 3213 3214 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3215 final List<? extends Cell> actual) { 3216 final int eLen = expected.size(); 3217 final int aLen = actual.size(); 3218 final int minLen = Math.min(eLen, aLen); 3219 3220 int i = 0; 3221 while ( 3222 i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0 3223 ) { 3224 i++; 3225 } 3226 3227 if (additionalMsg == null) { 3228 additionalMsg = ""; 3229 } 3230 if (!additionalMsg.isEmpty()) { 3231 additionalMsg = ". " + additionalMsg; 3232 } 3233 3234 if (eLen != aLen || i != minLen) { 3235 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3236 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3237 + " (length " + aLen + ")" + additionalMsg); 3238 } 3239 } 3240 3241 public static <T> String safeGetAsStr(List<T> lst, int i) { 3242 if (0 <= i && i < lst.size()) { 3243 return lst.get(i).toString(); 3244 } else { 3245 return "<out_of_range>"; 3246 } 3247 } 3248 3249 public String getRpcConnnectionURI() throws UnknownHostException { 3250 return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); 3251 } 3252 3253 public String getZkConnectionURI() { 3254 return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 3255 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3256 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3257 } 3258 3259 /** 3260 * Get the zk based cluster key for this cluster. 3261 * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the 3262 * connection info of a cluster. Keep here only for compatibility. 3263 * @see #getRpcConnnectionURI() 3264 * @see #getZkConnectionURI() 3265 */ 3266 @Deprecated 3267 public String getClusterKey() { 3268 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3269 + ":" 3270 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3271 } 3272 3273 /** 3274 * Creates a random table with the given parameters 3275 */ 3276 public Table createRandomTable(TableName tableName, final Collection<String> families, 3277 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3278 final int numRowsPerFlush) throws IOException, InterruptedException { 3279 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3280 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3281 + maxVersions + "\n"); 3282 3283 final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L); 3284 final int numCF = families.size(); 3285 final byte[][] cfBytes = new byte[numCF][]; 3286 { 3287 int cfIndex = 0; 3288 for (String cf : families) { 3289 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3290 } 3291 } 3292 3293 final int actualStartKey = 0; 3294 final int actualEndKey = Integer.MAX_VALUE; 3295 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3296 final int splitStartKey = actualStartKey + keysPerRegion; 3297 final int splitEndKey = actualEndKey - keysPerRegion; 3298 final String keyFormat = "%08x"; 3299 final Table table = createTable(tableName, cfBytes, maxVersions, 3300 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3301 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3302 3303 if (hbaseCluster != null) { 3304 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3305 } 3306 3307 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3308 3309 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3310 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3311 final byte[] row = Bytes.toBytes( 3312 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3313 3314 Put put = new Put(row); 3315 Delete del = new Delete(row); 3316 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3317 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3318 final long ts = rand.nextInt(); 3319 final byte[] qual = Bytes.toBytes("col" + iCol); 3320 if (rand.nextBoolean()) { 3321 final byte[] value = 3322 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3323 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3324 put.addColumn(cf, qual, ts, value); 3325 } else if (rand.nextDouble() < 0.8) { 3326 del.addColumn(cf, qual, ts); 3327 } else { 3328 del.addColumns(cf, qual, ts); 3329 } 3330 } 3331 3332 if (!put.isEmpty()) { 3333 mutator.mutate(put); 3334 } 3335 3336 if (!del.isEmpty()) { 3337 mutator.mutate(del); 3338 } 3339 } 3340 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3341 mutator.flush(); 3342 if (hbaseCluster != null) { 3343 getMiniHBaseCluster().flushcache(table.getName()); 3344 } 3345 } 3346 mutator.close(); 3347 3348 return table; 3349 } 3350 3351 public static int randomFreePort() { 3352 return HBaseCommonTestingUtil.randomFreePort(); 3353 } 3354 3355 public static String randomMultiCastAddress() { 3356 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3357 } 3358 3359 public static void waitForHostPort(String host, int port) throws IOException { 3360 final int maxTimeMs = 10000; 3361 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3362 IOException savedException = null; 3363 LOG.info("Waiting for server at " + host + ":" + port); 3364 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3365 try { 3366 Socket sock = new Socket(InetAddress.getByName(host), port); 3367 sock.close(); 3368 savedException = null; 3369 LOG.info("Server at " + host + ":" + port + " is available"); 3370 break; 3371 } catch (UnknownHostException e) { 3372 throw new IOException("Failed to look up " + host, e); 3373 } catch (IOException e) { 3374 savedException = e; 3375 } 3376 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3377 } 3378 3379 if (savedException != null) { 3380 throw savedException; 3381 } 3382 } 3383 3384 public static int getMetaRSPort(Connection connection) throws IOException { 3385 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3386 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3387 } 3388 } 3389 3390 /** 3391 * Due to async racing issue, a region may not be in the online region list of a region server 3392 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3393 */ 3394 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3395 final long timeout) throws IOException, InterruptedException { 3396 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3397 while (true) { 3398 List<RegionInfo> regions = getAdmin().getRegions(server); 3399 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3400 long now = EnvironmentEdgeManager.currentTime(); 3401 if (now > timeoutTime) break; 3402 Thread.sleep(10); 3403 } 3404 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3405 } 3406 3407 /** 3408 * Check to make sure the region is open on the specified region server, but not on any other one. 3409 */ 3410 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3411 final long timeout) throws IOException, InterruptedException { 3412 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3413 while (true) { 3414 List<RegionInfo> regions = getAdmin().getRegions(server); 3415 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3416 List<JVMClusterUtil.RegionServerThread> rsThreads = 3417 getHBaseCluster().getLiveRegionServerThreads(); 3418 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3419 HRegionServer rs = rsThread.getRegionServer(); 3420 if (server.equals(rs.getServerName())) { 3421 continue; 3422 } 3423 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3424 for (HRegion r : hrs) { 3425 assertTrue("Region should not be double assigned", 3426 r.getRegionInfo().getRegionId() != hri.getRegionId()); 3427 } 3428 } 3429 return; // good, we are happy 3430 } 3431 long now = EnvironmentEdgeManager.currentTime(); 3432 if (now > timeoutTime) break; 3433 Thread.sleep(10); 3434 } 3435 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3436 } 3437 3438 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3439 TableDescriptor td = 3440 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3441 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3442 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3443 } 3444 3445 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3446 BlockCache blockCache) throws IOException { 3447 TableDescriptor td = 3448 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3449 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3450 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3451 } 3452 3453 public static void setFileSystemURI(String fsURI) { 3454 FS_URI = fsURI; 3455 } 3456 3457 /** 3458 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3459 */ 3460 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3461 return new ExplainingPredicate<IOException>() { 3462 @Override 3463 public String explainFailure() throws IOException { 3464 final RegionStates regionStates = 3465 getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 3466 return "found in transition: " + regionStates.getRegionsInTransition().toString(); 3467 } 3468 3469 @Override 3470 public boolean evaluate() throws IOException { 3471 HMaster master = getMiniHBaseCluster().getMaster(); 3472 if (master == null) return false; 3473 AssignmentManager am = master.getAssignmentManager(); 3474 if (am == null) return false; 3475 return !am.hasRegionsInTransition(); 3476 } 3477 }; 3478 } 3479 3480 /** 3481 * Returns a {@link Predicate} for checking that table is enabled 3482 */ 3483 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3484 return new ExplainingPredicate<IOException>() { 3485 @Override 3486 public String explainFailure() throws IOException { 3487 return explainTableState(tableName, TableState.State.ENABLED); 3488 } 3489 3490 @Override 3491 public boolean evaluate() throws IOException { 3492 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3493 } 3494 }; 3495 } 3496 3497 /** 3498 * Returns a {@link Predicate} for checking that table is enabled 3499 */ 3500 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3501 return new ExplainingPredicate<IOException>() { 3502 @Override 3503 public String explainFailure() throws IOException { 3504 return explainTableState(tableName, TableState.State.DISABLED); 3505 } 3506 3507 @Override 3508 public boolean evaluate() throws IOException { 3509 return getAdmin().isTableDisabled(tableName); 3510 } 3511 }; 3512 } 3513 3514 /** 3515 * Returns a {@link Predicate} for checking that table is enabled 3516 */ 3517 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3518 return new ExplainingPredicate<IOException>() { 3519 @Override 3520 public String explainFailure() throws IOException { 3521 return explainTableAvailability(tableName); 3522 } 3523 3524 @Override 3525 public boolean evaluate() throws IOException { 3526 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3527 if (tableAvailable) { 3528 try (Table table = getConnection().getTable(tableName)) { 3529 TableDescriptor htd = table.getDescriptor(); 3530 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3531 .getAllRegionLocations()) { 3532 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3533 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3534 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3535 for (byte[] family : htd.getColumnFamilyNames()) { 3536 scan.addFamily(family); 3537 } 3538 try (ResultScanner scanner = table.getScanner(scan)) { 3539 scanner.next(); 3540 } 3541 } 3542 } 3543 } 3544 return tableAvailable; 3545 } 3546 }; 3547 } 3548 3549 /** 3550 * Wait until no regions in transition. 3551 * @param timeout How long to wait. 3552 */ 3553 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3554 waitFor(timeout, predicateNoRegionsInTransition()); 3555 } 3556 3557 /** 3558 * Wait until no regions in transition. (time limit 15min) 3559 */ 3560 public void waitUntilNoRegionsInTransition() throws IOException { 3561 waitUntilNoRegionsInTransition(15 * 60000); 3562 } 3563 3564 /** 3565 * Wait until labels is ready in VisibilityLabelsCache. 3566 */ 3567 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 3568 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 3569 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 3570 3571 @Override 3572 public boolean evaluate() { 3573 for (String label : labels) { 3574 if (labelsCache.getLabelOrdinal(label) == 0) { 3575 return false; 3576 } 3577 } 3578 return true; 3579 } 3580 3581 @Override 3582 public String explainFailure() { 3583 for (String label : labels) { 3584 if (labelsCache.getLabelOrdinal(label) == 0) { 3585 return label + " is not available yet"; 3586 } 3587 } 3588 return ""; 3589 } 3590 }); 3591 } 3592 3593 /** 3594 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3595 * available. 3596 * @return the list of column descriptors 3597 */ 3598 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 3599 return generateColumnDescriptors(""); 3600 } 3601 3602 /** 3603 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3604 * available. 3605 * @param prefix family names prefix 3606 * @return the list of column descriptors 3607 */ 3608 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 3609 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 3610 long familyId = 0; 3611 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 3612 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 3613 for (BloomType bloomType : BloomType.values()) { 3614 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 3615 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 3616 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 3617 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 3618 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 3619 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 3620 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 3621 familyId++; 3622 } 3623 } 3624 } 3625 return columnFamilyDescriptors; 3626 } 3627 3628 /** 3629 * Get supported compression algorithms. 3630 * @return supported compression algorithms. 3631 */ 3632 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 3633 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 3634 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 3635 for (String algoName : allAlgos) { 3636 try { 3637 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 3638 algo.getCompressor(); 3639 supportedAlgos.add(algo); 3640 } catch (Throwable t) { 3641 // this algo is not available 3642 } 3643 } 3644 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 3645 } 3646 3647 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 3648 Scan scan = new Scan().withStartRow(row); 3649 scan.setReadType(ReadType.PREAD); 3650 scan.setCaching(1); 3651 scan.setReversed(true); 3652 scan.addFamily(family); 3653 try (RegionScanner scanner = r.getScanner(scan)) { 3654 List<Cell> cells = new ArrayList<>(1); 3655 scanner.next(cells); 3656 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 3657 return null; 3658 } 3659 return Result.create(cells); 3660 } 3661 } 3662 3663 private boolean isTargetTable(final byte[] inRow, Cell c) { 3664 String inputRowString = Bytes.toString(inRow); 3665 int i = inputRowString.indexOf(HConstants.DELIMITER); 3666 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 3667 int o = outputRowString.indexOf(HConstants.DELIMITER); 3668 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 3669 } 3670 3671 /** 3672 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 3673 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 3674 * kerby KDC server and utility for using it, 3675 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 3676 * less baggage. It came in in HBASE-5291. 3677 */ 3678 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 3679 Properties conf = MiniKdc.createConf(); 3680 conf.put(MiniKdc.DEBUG, true); 3681 MiniKdc kdc = null; 3682 File dir = null; 3683 // There is time lag between selecting a port and trying to bind with it. It's possible that 3684 // another service captures the port in between which'll result in BindException. 3685 boolean bindException; 3686 int numTries = 0; 3687 do { 3688 try { 3689 bindException = false; 3690 dir = new File(getDataTestDir("kdc").toUri().getPath()); 3691 kdc = new MiniKdc(conf, dir); 3692 kdc.start(); 3693 } catch (BindException e) { 3694 FileUtils.deleteDirectory(dir); // clean directory 3695 numTries++; 3696 if (numTries == 3) { 3697 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 3698 throw e; 3699 } 3700 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 3701 bindException = true; 3702 } 3703 } while (bindException); 3704 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 3705 return kdc; 3706 } 3707 3708 public int getNumHFiles(final TableName tableName, final byte[] family) { 3709 int numHFiles = 0; 3710 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 3711 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 3712 } 3713 return numHFiles; 3714 } 3715 3716 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 3717 final byte[] family) { 3718 int numHFiles = 0; 3719 for (Region region : rs.getRegions(tableName)) { 3720 numHFiles += region.getStore(family).getStorefilesCount(); 3721 } 3722 return numHFiles; 3723 } 3724 3725 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 3726 assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode()); 3727 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 3728 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 3729 assertEquals(ltdFamilies.size(), rtdFamilies.size()); 3730 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 3731 it2 = rtdFamilies.iterator(); it.hasNext();) { 3732 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 3733 } 3734 } 3735 3736 /** 3737 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 3738 * invocations. 3739 */ 3740 public static void await(final long sleepMillis, final BooleanSupplier condition) 3741 throws InterruptedException { 3742 try { 3743 while (!condition.getAsBoolean()) { 3744 Thread.sleep(sleepMillis); 3745 } 3746 } catch (RuntimeException e) { 3747 if (e.getCause() instanceof AssertionError) { 3748 throw (AssertionError) e.getCause(); 3749 } 3750 throw e; 3751 } 3752 } 3753}