001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.File; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.io.UncheckedIOException; 029import java.lang.reflect.Field; 030import java.lang.reflect.Modifier; 031import java.net.BindException; 032import java.net.DatagramSocket; 033import java.net.InetAddress; 034import java.net.ServerSocket; 035import java.net.Socket; 036import java.net.UnknownHostException; 037import java.nio.charset.StandardCharsets; 038import java.security.MessageDigest; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.HashSet; 044import java.util.Iterator; 045import java.util.List; 046import java.util.Map; 047import java.util.NavigableSet; 048import java.util.Properties; 049import java.util.Random; 050import java.util.Set; 051import java.util.TreeSet; 052import java.util.concurrent.ExecutionException; 053import java.util.concurrent.ThreadLocalRandom; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicReference; 056import java.util.function.BooleanSupplier; 057import org.apache.commons.io.FileUtils; 058import org.apache.commons.lang3.RandomStringUtils; 059import org.apache.hadoop.conf.Configuration; 060import org.apache.hadoop.fs.FileSystem; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 063import org.apache.hadoop.hbase.Waiter.Predicate; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.AsyncAdmin; 066import org.apache.hadoop.hbase.client.AsyncClusterConnection; 067import org.apache.hadoop.hbase.client.BufferedMutator; 068import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.Connection; 072import org.apache.hadoop.hbase.client.ConnectionFactory; 073import org.apache.hadoop.hbase.client.Consistency; 074import org.apache.hadoop.hbase.client.Delete; 075import org.apache.hadoop.hbase.client.Durability; 076import org.apache.hadoop.hbase.client.Get; 077import org.apache.hadoop.hbase.client.Hbck; 078import org.apache.hadoop.hbase.client.MasterRegistry; 079import org.apache.hadoop.hbase.client.Put; 080import org.apache.hadoop.hbase.client.RegionInfo; 081import org.apache.hadoop.hbase.client.RegionInfoBuilder; 082import org.apache.hadoop.hbase.client.RegionLocator; 083import org.apache.hadoop.hbase.client.Result; 084import org.apache.hadoop.hbase.client.ResultScanner; 085import org.apache.hadoop.hbase.client.Scan; 086import org.apache.hadoop.hbase.client.Scan.ReadType; 087import org.apache.hadoop.hbase.client.Table; 088import org.apache.hadoop.hbase.client.TableDescriptor; 089import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 090import org.apache.hadoop.hbase.client.TableState; 091import org.apache.hadoop.hbase.fs.HFileSystem; 092import org.apache.hadoop.hbase.io.compress.Compression; 093import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 095import org.apache.hadoop.hbase.io.hfile.BlockCache; 096import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 097import org.apache.hadoop.hbase.io.hfile.HFile; 098import org.apache.hadoop.hbase.ipc.RpcServerInterface; 099import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 100import org.apache.hadoop.hbase.master.HMaster; 101import org.apache.hadoop.hbase.master.RegionState; 102import org.apache.hadoop.hbase.master.ServerManager; 103import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 104import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 105import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 106import org.apache.hadoop.hbase.master.assignment.RegionStates; 107import org.apache.hadoop.hbase.mob.MobFileCache; 108import org.apache.hadoop.hbase.regionserver.BloomType; 109import org.apache.hadoop.hbase.regionserver.ChunkCreator; 110import org.apache.hadoop.hbase.regionserver.HRegion; 111import org.apache.hadoop.hbase.regionserver.HRegionServer; 112import org.apache.hadoop.hbase.regionserver.HStore; 113import org.apache.hadoop.hbase.regionserver.InternalScanner; 114import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 115import org.apache.hadoop.hbase.regionserver.Region; 116import org.apache.hadoop.hbase.regionserver.RegionScanner; 117import org.apache.hadoop.hbase.regionserver.RegionServerServices; 118import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 119import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 120import org.apache.hadoop.hbase.security.User; 121import org.apache.hadoop.hbase.security.UserProvider; 122import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 123import org.apache.hadoop.hbase.util.Bytes; 124import org.apache.hadoop.hbase.util.CommonFSUtils; 125import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 126import org.apache.hadoop.hbase.util.FSUtils; 127import org.apache.hadoop.hbase.util.JVM; 128import org.apache.hadoop.hbase.util.JVMClusterUtil; 129import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 130import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 131import org.apache.hadoop.hbase.util.Pair; 132import org.apache.hadoop.hbase.util.ReflectionUtils; 133import org.apache.hadoop.hbase.util.RegionSplitter; 134import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm; 135import org.apache.hadoop.hbase.util.RetryCounter; 136import org.apache.hadoop.hbase.util.Threads; 137import org.apache.hadoop.hbase.wal.WAL; 138import org.apache.hadoop.hbase.wal.WALFactory; 139import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 140import org.apache.hadoop.hbase.zookeeper.ZKConfig; 141import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 142import org.apache.hadoop.hdfs.DFSClient; 143import org.apache.hadoop.hdfs.DistributedFileSystem; 144import org.apache.hadoop.hdfs.MiniDFSCluster; 145import org.apache.hadoop.hdfs.server.datanode.DataNode; 146import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 147import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 148import org.apache.hadoop.mapred.JobConf; 149import org.apache.hadoop.mapred.MiniMRCluster; 150import org.apache.hadoop.mapred.TaskLog; 151import org.apache.hadoop.minikdc.MiniKdc; 152import org.apache.yetus.audience.InterfaceAudience; 153import org.apache.yetus.audience.InterfaceStability; 154import org.apache.zookeeper.WatchedEvent; 155import org.apache.zookeeper.ZooKeeper; 156import org.apache.zookeeper.ZooKeeper.States; 157 158import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 159 160import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 161 162/** 163 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 164 * functionality. Create an instance and keep it around testing HBase. 165 * <p/> 166 * This class is meant to be your one-stop shop for anything you might need testing. Manages one 167 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster}, 168 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real 169 * cluster. 170 * <p/> 171 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration. 172 * <p/> 173 * It does not set logging levels. 174 * <p/> 175 * In the configuration properties, default values for master-info-port and region-server-port are 176 * overridden such that a random port will be assigned (thus avoiding port contention if another 177 * local HBase instance is already running). 178 * <p/> 179 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 180 * setting it to true. 181 */ 182@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 183@InterfaceStability.Evolving 184public class HBaseTestingUtil extends HBaseZKTestingUtil { 185 186 /** 187 * System property key to get test directory value. Name is as it is because mini dfs has 188 * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property 189 * used in mini dfs. 190 * @deprecated since 2.0.0 and will be removed in 3.0.0. Can be used only with mini dfs. 191 * @see <a href="https://issues.apache.org/jira/browse/HBASE-19410">HBASE-19410</a> 192 */ 193 @Deprecated 194 private static final String TEST_DIRECTORY_KEY = "test.build.data"; 195 196 public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server"; 197 /** 198 * The default number of regions per regionserver when creating a pre-split table. 199 */ 200 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 201 202 public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table"; 203 public static final boolean PRESPLIT_TEST_TABLE = true; 204 205 private MiniDFSCluster dfsCluster = null; 206 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 207 208 private volatile HBaseClusterInterface hbaseCluster = null; 209 private MiniMRCluster mrCluster = null; 210 211 /** If there is a mini cluster running for this testing utility instance. */ 212 private volatile boolean miniClusterRunning; 213 214 private String hadoopLogDir; 215 216 /** 217 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 218 */ 219 private Path dataTestDirOnTestFS = null; 220 221 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 222 223 /** Filesystem URI used for map-reduce mini-cluster setup */ 224 private static String FS_URI; 225 226 /** This is for unit tests parameterized with a single boolean. */ 227 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 228 229 /** 230 * Checks to see if a specific port is available. 231 * @param port the port number to check for availability 232 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 233 */ 234 public static boolean available(int port) { 235 ServerSocket ss = null; 236 DatagramSocket ds = null; 237 try { 238 ss = new ServerSocket(port); 239 ss.setReuseAddress(true); 240 ds = new DatagramSocket(port); 241 ds.setReuseAddress(true); 242 return true; 243 } catch (IOException e) { 244 // Do nothing 245 } finally { 246 if (ds != null) { 247 ds.close(); 248 } 249 250 if (ss != null) { 251 try { 252 ss.close(); 253 } catch (IOException e) { 254 /* should not be thrown */ 255 } 256 } 257 } 258 259 return false; 260 } 261 262 /** 263 * Create all combinations of Bloom filters and compression algorithms for testing. 264 */ 265 private static List<Object[]> bloomAndCompressionCombinations() { 266 List<Object[]> configurations = new ArrayList<>(); 267 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 268 for (BloomType bloomType : BloomType.values()) { 269 configurations.add(new Object[] { comprAlgo, bloomType }); 270 } 271 } 272 return Collections.unmodifiableList(configurations); 273 } 274 275 /** 276 * Create combination of memstoreTS and tags 277 */ 278 private static List<Object[]> memStoreTSAndTagsCombination() { 279 List<Object[]> configurations = new ArrayList<>(); 280 configurations.add(new Object[] { false, false }); 281 configurations.add(new Object[] { false, true }); 282 configurations.add(new Object[] { true, false }); 283 configurations.add(new Object[] { true, true }); 284 return Collections.unmodifiableList(configurations); 285 } 286 287 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 288 List<Object[]> configurations = new ArrayList<>(); 289 configurations.add(new Object[] { false, false, true }); 290 configurations.add(new Object[] { false, false, false }); 291 configurations.add(new Object[] { false, true, true }); 292 configurations.add(new Object[] { false, true, false }); 293 configurations.add(new Object[] { true, false, true }); 294 configurations.add(new Object[] { true, false, false }); 295 configurations.add(new Object[] { true, true, true }); 296 configurations.add(new Object[] { true, true, false }); 297 return Collections.unmodifiableList(configurations); 298 } 299 300 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 301 bloomAndCompressionCombinations(); 302 303 /** 304 * <p> 305 * Create an HBaseTestingUtility using a default configuration. 306 * <p> 307 * Initially, all tmp files are written to a local test data directory. Once 308 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 309 * data will be written to the DFS directory instead. 310 */ 311 public HBaseTestingUtil() { 312 this(HBaseConfiguration.create()); 313 } 314 315 /** 316 * <p> 317 * Create an HBaseTestingUtility using a given configuration. 318 * <p> 319 * Initially, all tmp files are written to a local test data directory. Once 320 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 321 * data will be written to the DFS directory instead. 322 * @param conf The configuration to use for further operations 323 */ 324 public HBaseTestingUtil(@Nullable Configuration conf) { 325 super(conf); 326 327 // a hbase checksum verification failure will cause unit tests to fail 328 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 329 330 // Save this for when setting default file:// breaks things 331 if (this.conf.get("fs.defaultFS") != null) { 332 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 333 } 334 if (this.conf.get(HConstants.HBASE_DIR) != null) { 335 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 336 } 337 // Every cluster is a local cluster until we start DFS 338 // Note that conf could be null, but this.conf will not be 339 String dataTestDir = getDataTestDir().toString(); 340 this.conf.set("fs.defaultFS", "file:///"); 341 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 342 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 343 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 344 // If the value for random ports isn't set set it to true, thus making 345 // tests opt-out for random port assignment 346 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 347 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 348 } 349 350 /** 351 * Close both the region {@code r} and it's underlying WAL. For use in tests. 352 */ 353 public static void closeRegionAndWAL(final Region r) throws IOException { 354 closeRegionAndWAL((HRegion) r); 355 } 356 357 /** 358 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 359 */ 360 public static void closeRegionAndWAL(final HRegion r) throws IOException { 361 if (r == null) return; 362 r.close(); 363 if (r.getWAL() == null) return; 364 r.getWAL().close(); 365 } 366 367 /** 368 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 369 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 370 * by the Configuration. If say, a Connection was being used against a cluster that had been 371 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 372 * Rather than use the return direct, its usually best to make a copy and use that. Do 373 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 374 * @return Instance of Configuration. 375 */ 376 @Override 377 public Configuration getConfiguration() { 378 return super.getConfiguration(); 379 } 380 381 public void setHBaseCluster(HBaseClusterInterface hbaseCluster) { 382 this.hbaseCluster = hbaseCluster; 383 } 384 385 /** 386 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 387 * have many concurrent tests running if we need to. It needs to amend the 388 * {@link #TEST_DIRECTORY_KEY} System property, as it's what minidfscluster bases it data dir on. 389 * Moding a System property is not the way to do concurrent instances -- another instance could 390 * grab the temporary value unintentionally -- but not anything can do about it at moment; single 391 * instance only is how the minidfscluster works. We also create the underlying directory names 392 * for hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values in the 393 * conf, and as a system property for hadoop.tmp.dir (We do not create them!). 394 * @return The calculated data test build directory, if newly-created. 395 */ 396 @Override 397 protected Path setupDataTestDir() { 398 Path testPath = super.setupDataTestDir(); 399 if (null == testPath) { 400 return null; 401 } 402 403 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 404 405 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 406 // we want our own value to ensure uniqueness on the same machine 407 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 408 409 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 410 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 411 return testPath; 412 } 413 414 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 415 416 String sysValue = System.getProperty(propertyName); 417 418 if (sysValue != null) { 419 // There is already a value set. So we do nothing but hope 420 // that there will be no conflicts 421 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 422 + " so I do NOT create it in " + parent); 423 String confValue = conf.get(propertyName); 424 if (confValue != null && !confValue.endsWith(sysValue)) { 425 LOG.warn(propertyName + " property value differs in configuration and system: " 426 + "Configuration=" + confValue + " while System=" + sysValue 427 + " Erasing configuration value by system value."); 428 } 429 conf.set(propertyName, sysValue); 430 } else { 431 // Ok, it's not set, so we create it as a subdirectory 432 createSubDir(propertyName, parent, subDirName); 433 System.setProperty(propertyName, conf.get(propertyName)); 434 } 435 } 436 437 /** 438 * @return Where to write test data on the test filesystem; Returns working directory for the test 439 * filesystem by default 440 * @see #setupDataTestDirOnTestFS() 441 * @see #getTestFileSystem() 442 */ 443 private Path getBaseTestDirOnTestFS() throws IOException { 444 FileSystem fs = getTestFileSystem(); 445 return new Path(fs.getWorkingDirectory(), "test-data"); 446 } 447 448 /** 449 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 450 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 451 * on it. 452 * @return a unique path in the test filesystem 453 */ 454 public Path getDataTestDirOnTestFS() throws IOException { 455 if (dataTestDirOnTestFS == null) { 456 setupDataTestDirOnTestFS(); 457 } 458 459 return dataTestDirOnTestFS; 460 } 461 462 /** 463 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 464 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 465 * on it. 466 * @return a unique path in the test filesystem 467 * @param subdirName name of the subdir to create under the base test dir 468 */ 469 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 470 return new Path(getDataTestDirOnTestFS(), subdirName); 471 } 472 473 /** 474 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 475 * setup. 476 */ 477 private void setupDataTestDirOnTestFS() throws IOException { 478 if (dataTestDirOnTestFS != null) { 479 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 480 return; 481 } 482 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 483 } 484 485 /** 486 * Sets up a new path in test filesystem to be used by tests. 487 */ 488 private Path getNewDataTestDirOnTestFS() throws IOException { 489 // The file system can be either local, mini dfs, or if the configuration 490 // is supplied externally, it can be an external cluster FS. If it is a local 491 // file system, the tests should use getBaseTestDir, otherwise, we can use 492 // the working directory, and create a unique sub dir there 493 FileSystem fs = getTestFileSystem(); 494 Path newDataTestDir; 495 String randomStr = getRandomUUID().toString(); 496 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 497 newDataTestDir = new Path(getDataTestDir(), randomStr); 498 File dataTestDir = new File(newDataTestDir.toString()); 499 if (deleteOnExit()) dataTestDir.deleteOnExit(); 500 } else { 501 Path base = getBaseTestDirOnTestFS(); 502 newDataTestDir = new Path(base, randomStr); 503 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 504 } 505 return newDataTestDir; 506 } 507 508 /** 509 * Cleans the test data directory on the test filesystem. 510 * @return True if we removed the test dirs 511 */ 512 public boolean cleanupDataTestDirOnTestFS() throws IOException { 513 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 514 if (ret) { 515 dataTestDirOnTestFS = null; 516 } 517 return ret; 518 } 519 520 /** 521 * Cleans a subdirectory under the test data directory on the test filesystem. 522 * @return True if we removed child 523 */ 524 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 525 Path cpath = getDataTestDirOnTestFS(subdirName); 526 return getTestFileSystem().delete(cpath, true); 527 } 528 529 /** 530 * Start a minidfscluster. 531 * @param servers How many DNs to start. 532 * @see #shutdownMiniDFSCluster() 533 * @return The mini dfs cluster created. 534 */ 535 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 536 return startMiniDFSCluster(servers, null); 537 } 538 539 /** 540 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 541 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 542 * instances of the datanodes will have the same host name. 543 * @param hosts hostnames DNs to run on. 544 * @see #shutdownMiniDFSCluster() 545 * @return The mini dfs cluster created. 546 */ 547 public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception { 548 if (hosts != null && hosts.length != 0) { 549 return startMiniDFSCluster(hosts.length, hosts); 550 } else { 551 return startMiniDFSCluster(1, null); 552 } 553 } 554 555 /** 556 * Start a minidfscluster. Can only create one. 557 * @param servers How many DNs to start. 558 * @param hosts hostnames DNs to run on. 559 * @see #shutdownMiniDFSCluster() 560 * @return The mini dfs cluster created. 561 */ 562 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception { 563 return startMiniDFSCluster(servers, null, hosts); 564 } 565 566 private void setFs() throws IOException { 567 if (this.dfsCluster == null) { 568 LOG.info("Skipping setting fs because dfsCluster is null"); 569 return; 570 } 571 FileSystem fs = this.dfsCluster.getFileSystem(); 572 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 573 574 // re-enable this check with dfs 575 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 576 } 577 578 // Workaround to avoid IllegalThreadStateException 579 // See HBASE-27148 for more details 580 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 581 582 private volatile boolean stopped = false; 583 584 private final MiniDFSCluster cluster; 585 586 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 587 super("FsDatasetAsyncDiskServiceFixer"); 588 setDaemon(true); 589 this.cluster = cluster; 590 } 591 592 @Override 593 public void run() { 594 while (!stopped) { 595 try { 596 Thread.sleep(30000); 597 } catch (InterruptedException e) { 598 Thread.currentThread().interrupt(); 599 continue; 600 } 601 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 602 // timeout of the thread pool executor is 60 seconds by default. 603 try { 604 for (DataNode dn : cluster.getDataNodes()) { 605 FsDatasetSpi<?> dataset = dn.getFSDataset(); 606 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 607 service.setAccessible(true); 608 Object asyncDiskService = service.get(dataset); 609 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 610 group.setAccessible(true); 611 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 612 if (threadGroup.isDaemon()) { 613 threadGroup.setDaemon(false); 614 } 615 } 616 } catch (NoSuchFieldException e) { 617 LOG.debug("NoSuchFieldException: " + e.getMessage() 618 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 619 + "See HBASE-27595 for details."); 620 } catch (Exception e) { 621 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 622 } 623 } 624 } 625 626 void shutdown() { 627 stopped = true; 628 interrupt(); 629 } 630 } 631 632 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts) 633 throws Exception { 634 createDirsAndSetProperties(); 635 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 636 637 this.dfsCluster = 638 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 639 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 640 this.dfsClusterFixer.start(); 641 // Set this just-started cluster as our filesystem. 642 setFs(); 643 644 // Wait for the cluster to be totally up 645 this.dfsCluster.waitClusterUp(); 646 647 // reset the test directory for test file system 648 dataTestDirOnTestFS = null; 649 String dataTestDir = getDataTestDir().toString(); 650 conf.set(HConstants.HBASE_DIR, dataTestDir); 651 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 652 653 return this.dfsCluster; 654 } 655 656 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 657 createDirsAndSetProperties(); 658 dfsCluster = 659 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 660 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 661 this.dfsClusterFixer.start(); 662 return dfsCluster; 663 } 664 665 /** 666 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 667 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 668 * the conf. 669 * 670 * <pre> 671 * Configuration conf = TEST_UTIL.getConfiguration(); 672 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 673 * Map.Entry<String, String> e = i.next(); 674 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 675 * } 676 * </pre> 677 */ 678 private void createDirsAndSetProperties() throws IOException { 679 setupClusterTestDir(); 680 conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath()); 681 System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath()); 682 createDirAndSetProperty("test.cache.data"); 683 createDirAndSetProperty("hadoop.tmp.dir"); 684 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 685 createDirAndSetProperty("mapreduce.cluster.local.dir"); 686 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 687 enableShortCircuit(); 688 689 Path root = getDataTestDirOnTestFS("hadoop"); 690 conf.set(MapreduceTestingShim.getMROutputDirProp(), 691 new Path(root, "mapred-output-dir").toString()); 692 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 693 conf.set("mapreduce.jobtracker.staging.root.dir", 694 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 695 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 696 conf.set("yarn.app.mapreduce.am.staging-dir", 697 new Path(root, "mapreduce-am-staging-root-dir").toString()); 698 699 // Frustrate yarn's and hdfs's attempts at writing /tmp. 700 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 701 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 702 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 703 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 704 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 705 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 706 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 707 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 708 createDirAndSetProperty("dfs.journalnode.edits.dir"); 709 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 710 createDirAndSetProperty("nfs.dump.dir"); 711 createDirAndSetProperty("java.io.tmpdir"); 712 createDirAndSetProperty("dfs.journalnode.edits.dir"); 713 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 714 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 715 } 716 717 /** 718 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 719 * Default to false. 720 */ 721 public boolean isNewVersionBehaviorEnabled() { 722 final String propName = "hbase.tests.new.version.behavior"; 723 String v = System.getProperty(propName); 724 if (v != null) { 725 return Boolean.parseBoolean(v); 726 } 727 return false; 728 } 729 730 /** 731 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 732 * allows to specify this parameter on the command line. If not set, default is true. 733 */ 734 public boolean isReadShortCircuitOn() { 735 final String propName = "hbase.tests.use.shortcircuit.reads"; 736 String readOnProp = System.getProperty(propName); 737 if (readOnProp != null) { 738 return Boolean.parseBoolean(readOnProp); 739 } else { 740 return conf.getBoolean(propName, false); 741 } 742 } 743 744 /** 745 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 746 * including skipping the hdfs checksum checks. 747 */ 748 private void enableShortCircuit() { 749 if (isReadShortCircuitOn()) { 750 String curUser = System.getProperty("user.name"); 751 LOG.info("read short circuit is ON for user " + curUser); 752 // read short circuit, for hdfs 753 conf.set("dfs.block.local-path-access.user", curUser); 754 // read short circuit, for hbase 755 conf.setBoolean("dfs.client.read.shortcircuit", true); 756 // Skip checking checksum, for the hdfs client and the datanode 757 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 758 } else { 759 LOG.info("read short circuit is OFF"); 760 } 761 } 762 763 private String createDirAndSetProperty(final String property) { 764 return createDirAndSetProperty(property, property); 765 } 766 767 private String createDirAndSetProperty(final String relPath, String property) { 768 String path = getDataTestDir(relPath).toString(); 769 System.setProperty(property, path); 770 conf.set(property, path); 771 new File(path).mkdirs(); 772 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 773 return path; 774 } 775 776 /** 777 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 778 */ 779 public void shutdownMiniDFSCluster() throws IOException { 780 if (this.dfsCluster != null) { 781 // The below throws an exception per dn, AsynchronousCloseException. 782 this.dfsCluster.shutdown(); 783 dfsCluster = null; 784 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 785 // have a fixer 786 if (dfsClusterFixer != null) { 787 this.dfsClusterFixer.shutdown(); 788 dfsClusterFixer = null; 789 } 790 dataTestDirOnTestFS = null; 791 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 792 } 793 } 794 795 /** 796 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 797 * other options will use default values, defined in {@link StartTestingClusterOption.Builder}. 798 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 799 * @see #startMiniCluster(StartTestingClusterOption option) 800 * @see #shutdownMiniDFSCluster() 801 */ 802 public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception { 803 StartTestingClusterOption option = StartTestingClusterOption.builder() 804 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 805 return startMiniCluster(option); 806 } 807 808 /** 809 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 810 * value can be found in {@link StartTestingClusterOption.Builder}. 811 * @see #startMiniCluster(StartTestingClusterOption option) 812 * @see #shutdownMiniDFSCluster() 813 */ 814 public SingleProcessHBaseCluster startMiniCluster() throws Exception { 815 return startMiniCluster(StartTestingClusterOption.builder().build()); 816 } 817 818 /** 819 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 820 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 821 * under System property test.build.data, to be cleaned up on exit. 822 * @see #shutdownMiniDFSCluster() 823 */ 824 public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option) 825 throws Exception { 826 LOG.info("Starting up minicluster with option: {}", option); 827 828 // If we already put up a cluster, fail. 829 if (miniClusterRunning) { 830 throw new IllegalStateException("A mini-cluster is already running"); 831 } 832 miniClusterRunning = true; 833 834 setupClusterTestDir(); 835 System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath()); 836 837 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 838 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 839 if (dfsCluster == null) { 840 LOG.info("STARTING DFS"); 841 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 842 } else { 843 LOG.info("NOT STARTING DFS"); 844 } 845 846 // Start up a zk cluster. 847 if (getZkCluster() == null) { 848 startMiniZKCluster(option.getNumZkServers()); 849 } 850 851 // Start the MiniHBaseCluster 852 return startMiniHBaseCluster(option); 853 } 854 855 /** 856 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 857 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 858 * @return Reference to the hbase mini hbase cluster. 859 * @see #startMiniCluster(StartTestingClusterOption) 860 * @see #shutdownMiniHBaseCluster() 861 */ 862 public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option) 863 throws IOException, InterruptedException { 864 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 865 createRootDir(option.isCreateRootDir()); 866 if (option.isCreateWALDir()) { 867 createWALRootDir(); 868 } 869 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 870 // for tests that do not read hbase-defaults.xml 871 setHBaseFsTmpDir(); 872 873 // These settings will make the server waits until this exact number of 874 // regions servers are connected. 875 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 876 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 877 } 878 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 879 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 880 } 881 882 Configuration c = new Configuration(this.conf); 883 this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(), 884 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 885 option.getMasterClass(), option.getRsClass()); 886 // Populate the master address configuration from mini cluster configuration. 887 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 888 // Don't leave here till we've done a successful scan of the hbase:meta 889 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 890 ResultScanner s = t.getScanner(new Scan())) { 891 for (;;) { 892 if (s.next() == null) { 893 break; 894 } 895 } 896 } 897 898 getAdmin(); // create immediately the hbaseAdmin 899 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 900 901 return (SingleProcessHBaseCluster) hbaseCluster; 902 } 903 904 /** 905 * Starts up mini hbase cluster using default options. Default options can be found in 906 * {@link StartTestingClusterOption.Builder}. 907 * @see #startMiniHBaseCluster(StartTestingClusterOption) 908 * @see #shutdownMiniHBaseCluster() 909 */ 910 public SingleProcessHBaseCluster startMiniHBaseCluster() 911 throws IOException, InterruptedException { 912 return startMiniHBaseCluster(StartTestingClusterOption.builder().build()); 913 } 914 915 /** 916 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 917 * {@link #startMiniCluster()}. All other options will use default values, defined in 918 * {@link StartTestingClusterOption.Builder}. 919 * @param numMasters Master node number. 920 * @param numRegionServers Number of region servers. 921 * @return The mini HBase cluster created. 922 * @see #shutdownMiniHBaseCluster() 923 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 924 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 925 * @see #startMiniHBaseCluster(StartTestingClusterOption) 926 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 927 */ 928 @Deprecated 929 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 930 throws IOException, InterruptedException { 931 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 932 .numRegionServers(numRegionServers).build(); 933 return startMiniHBaseCluster(option); 934 } 935 936 /** 937 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 938 * {@link #startMiniCluster()}. All other options will use default values, defined in 939 * {@link StartTestingClusterOption.Builder}. 940 * @param numMasters Master node number. 941 * @param numRegionServers Number of region servers. 942 * @param rsPorts Ports that RegionServer should use. 943 * @return The mini HBase cluster created. 944 * @see #shutdownMiniHBaseCluster() 945 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 946 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 947 * @see #startMiniHBaseCluster(StartTestingClusterOption) 948 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 949 */ 950 @Deprecated 951 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 952 List<Integer> rsPorts) throws IOException, InterruptedException { 953 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 954 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 955 return startMiniHBaseCluster(option); 956 } 957 958 /** 959 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 960 * {@link #startMiniCluster()}. All other options will use default values, defined in 961 * {@link StartTestingClusterOption.Builder}. 962 * @param numMasters Master node number. 963 * @param numRegionServers Number of region servers. 964 * @param rsPorts Ports that RegionServer should use. 965 * @param masterClass The class to use as HMaster, or null for default. 966 * @param rsClass The class to use as HRegionServer, or null for default. 967 * @param createRootDir Whether to create a new root or data directory path. 968 * @param createWALDir Whether to create a new WAL directory. 969 * @return The mini HBase cluster created. 970 * @see #shutdownMiniHBaseCluster() 971 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 972 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 973 * @see #startMiniHBaseCluster(StartTestingClusterOption) 974 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 975 */ 976 @Deprecated 977 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 978 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 979 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass, 980 boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException { 981 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 982 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 983 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 984 return startMiniHBaseCluster(option); 985 } 986 987 /** 988 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 989 * want to keep dfs/zk up and just stop/start hbase. 990 * @param servers number of region servers 991 */ 992 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 993 this.restartHBaseCluster(servers, null); 994 } 995 996 public void restartHBaseCluster(int servers, List<Integer> ports) 997 throws IOException, InterruptedException { 998 StartTestingClusterOption option = 999 StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 1000 restartHBaseCluster(option); 1001 invalidateConnection(); 1002 } 1003 1004 public void restartHBaseCluster(StartTestingClusterOption option) 1005 throws IOException, InterruptedException { 1006 closeConnection(); 1007 this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(), 1008 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1009 option.getMasterClass(), option.getRsClass()); 1010 // Don't leave here till we've done a successful scan of the hbase:meta 1011 Connection conn = ConnectionFactory.createConnection(this.conf); 1012 Table t = conn.getTable(TableName.META_TABLE_NAME); 1013 ResultScanner s = t.getScanner(new Scan()); 1014 while (s.next() != null) { 1015 // do nothing 1016 } 1017 LOG.info("HBase has been restarted"); 1018 s.close(); 1019 t.close(); 1020 conn.close(); 1021 } 1022 1023 /** 1024 * Returns current mini hbase cluster. Only has something in it after a call to 1025 * {@link #startMiniCluster()}. 1026 * @see #startMiniCluster() 1027 */ 1028 public SingleProcessHBaseCluster getMiniHBaseCluster() { 1029 if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) { 1030 return (SingleProcessHBaseCluster) this.hbaseCluster; 1031 } 1032 throw new RuntimeException( 1033 hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName()); 1034 } 1035 1036 /** 1037 * Stops mini hbase, zk, and hdfs clusters. 1038 * @see #startMiniCluster(int) 1039 */ 1040 public void shutdownMiniCluster() throws IOException { 1041 LOG.info("Shutting down minicluster"); 1042 shutdownMiniHBaseCluster(); 1043 shutdownMiniDFSCluster(); 1044 shutdownMiniZKCluster(); 1045 1046 cleanupTestDir(); 1047 miniClusterRunning = false; 1048 LOG.info("Minicluster is down"); 1049 } 1050 1051 /** 1052 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1053 * @throws java.io.IOException in case command is unsuccessful 1054 */ 1055 public void shutdownMiniHBaseCluster() throws IOException { 1056 cleanup(); 1057 if (this.hbaseCluster != null) { 1058 this.hbaseCluster.shutdown(); 1059 // Wait till hbase is down before going on to shutdown zk. 1060 this.hbaseCluster.waitUntilShutDown(); 1061 this.hbaseCluster = null; 1062 } 1063 if (zooKeeperWatcher != null) { 1064 zooKeeperWatcher.close(); 1065 zooKeeperWatcher = null; 1066 } 1067 } 1068 1069 /** 1070 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1071 * @throws java.io.IOException throws in case command is unsuccessful 1072 */ 1073 public void killMiniHBaseCluster() throws IOException { 1074 cleanup(); 1075 if (this.hbaseCluster != null) { 1076 getMiniHBaseCluster().killAll(); 1077 this.hbaseCluster = null; 1078 } 1079 if (zooKeeperWatcher != null) { 1080 zooKeeperWatcher.close(); 1081 zooKeeperWatcher = null; 1082 } 1083 } 1084 1085 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1086 private void cleanup() throws IOException { 1087 closeConnection(); 1088 // unset the configuration for MIN and MAX RS to start 1089 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1090 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1091 } 1092 1093 /** 1094 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1095 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1096 * If false, previous path is used. Note: this does not cause the root dir to be created. 1097 * @return Fully qualified path for the default hbase root dir 1098 */ 1099 public Path getDefaultRootDirPath(boolean create) throws IOException { 1100 if (!create) { 1101 return getDataTestDirOnTestFS(); 1102 } else { 1103 return getNewDataTestDirOnTestFS(); 1104 } 1105 } 1106 1107 /** 1108 * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that 1109 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1110 * @return Fully qualified path for the default hbase root dir 1111 */ 1112 public Path getDefaultRootDirPath() throws IOException { 1113 return getDefaultRootDirPath(false); 1114 } 1115 1116 /** 1117 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1118 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1119 * startup. You'd only use this method if you were doing manual operation. 1120 * @param create This flag decides whether to get a new root or data directory path or not, if it 1121 * has been fetched already. Note : Directory will be made irrespective of whether 1122 * path has been fetched or not. If directory already exists, it will be overwritten 1123 * @return Fully qualified path to hbase root dir 1124 */ 1125 public Path createRootDir(boolean create) throws IOException { 1126 FileSystem fs = FileSystem.get(this.conf); 1127 Path hbaseRootdir = getDefaultRootDirPath(create); 1128 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1129 fs.mkdirs(hbaseRootdir); 1130 FSUtils.setVersion(fs, hbaseRootdir); 1131 return hbaseRootdir; 1132 } 1133 1134 /** 1135 * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code> 1136 * flag is false. 1137 * @return Fully qualified path to hbase root dir 1138 */ 1139 public Path createRootDir() throws IOException { 1140 return createRootDir(false); 1141 } 1142 1143 /** 1144 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1145 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1146 * this method if you were doing manual operation. 1147 * @return Fully qualified path to hbase root dir 1148 */ 1149 public Path createWALRootDir() throws IOException { 1150 FileSystem fs = FileSystem.get(this.conf); 1151 Path walDir = getNewDataTestDirOnTestFS(); 1152 CommonFSUtils.setWALRootDir(this.conf, walDir); 1153 fs.mkdirs(walDir); 1154 return walDir; 1155 } 1156 1157 private void setHBaseFsTmpDir() throws IOException { 1158 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1159 if (hbaseFsTmpDirInString == null) { 1160 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1161 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1162 } else { 1163 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1164 } 1165 } 1166 1167 /** 1168 * Flushes all caches in the mini hbase cluster 1169 */ 1170 public void flush() throws IOException { 1171 getMiniHBaseCluster().flushcache(); 1172 } 1173 1174 /** 1175 * Flushes all caches in the mini hbase cluster 1176 */ 1177 public void flush(TableName tableName) throws IOException { 1178 getMiniHBaseCluster().flushcache(tableName); 1179 } 1180 1181 /** 1182 * Compact all regions in the mini hbase cluster 1183 */ 1184 public void compact(boolean major) throws IOException { 1185 getMiniHBaseCluster().compact(major); 1186 } 1187 1188 /** 1189 * Compact all of a table's reagion in the mini hbase cluster 1190 */ 1191 public void compact(TableName tableName, boolean major) throws IOException { 1192 getMiniHBaseCluster().compact(tableName, major); 1193 } 1194 1195 /** 1196 * Create a table. 1197 * @return A Table instance for the created table. 1198 */ 1199 public Table createTable(TableName tableName, String family) throws IOException { 1200 return createTable(tableName, new String[] { family }); 1201 } 1202 1203 /** 1204 * Create a table. 1205 * @return A Table instance for the created table. 1206 */ 1207 public Table createTable(TableName tableName, String[] families) throws IOException { 1208 List<byte[]> fams = new ArrayList<>(families.length); 1209 for (String family : families) { 1210 fams.add(Bytes.toBytes(family)); 1211 } 1212 return createTable(tableName, fams.toArray(new byte[0][])); 1213 } 1214 1215 /** 1216 * Create a table. 1217 * @return A Table instance for the created table. 1218 */ 1219 public Table createTable(TableName tableName, byte[] family) throws IOException { 1220 return createTable(tableName, new byte[][] { family }); 1221 } 1222 1223 /** 1224 * Create a table with multiple regions. 1225 * @return A Table instance for the created table. 1226 */ 1227 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1228 throws IOException { 1229 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1230 byte[] startKey = Bytes.toBytes("aaaaa"); 1231 byte[] endKey = Bytes.toBytes("zzzzz"); 1232 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1233 1234 return createTable(tableName, new byte[][] { family }, splitKeys); 1235 } 1236 1237 /** 1238 * Create a table. 1239 * @return A Table instance for the created table. 1240 */ 1241 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1242 return createTable(tableName, families, (byte[][]) null); 1243 } 1244 1245 /** 1246 * Create a table with multiple regions. 1247 * @return A Table instance for the created table. 1248 */ 1249 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1250 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1251 } 1252 1253 /** 1254 * Create a table with multiple regions. 1255 * @param replicaCount replica count. 1256 * @return A Table instance for the created table. 1257 */ 1258 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1259 throws IOException { 1260 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1261 } 1262 1263 /** 1264 * Create a table. 1265 * @return A Table instance for the created table. 1266 */ 1267 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1268 throws IOException { 1269 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1270 } 1271 1272 /** 1273 * Create a table. 1274 * @param tableName the table name 1275 * @param families the families 1276 * @param splitKeys the splitkeys 1277 * @param replicaCount the region replica count 1278 * @return A Table instance for the created table. 1279 * @throws IOException throws IOException 1280 */ 1281 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1282 int replicaCount) throws IOException { 1283 return createTable(tableName, families, splitKeys, replicaCount, 1284 new Configuration(getConfiguration())); 1285 } 1286 1287 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1288 byte[] endKey, int numRegions) throws IOException { 1289 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1290 1291 getAdmin().createTable(desc, startKey, endKey, numRegions); 1292 // HBaseAdmin only waits for regions to appear in hbase:meta we 1293 // should wait until they are assigned 1294 waitUntilAllRegionsAssigned(tableName); 1295 return getConnection().getTable(tableName); 1296 } 1297 1298 /** 1299 * Create a table. 1300 * @param c Configuration to use 1301 * @return A Table instance for the created table. 1302 */ 1303 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1304 throws IOException { 1305 return createTable(htd, families, null, c); 1306 } 1307 1308 /** 1309 * Create a table. 1310 * @param htd table descriptor 1311 * @param families array of column families 1312 * @param splitKeys array of split keys 1313 * @param c Configuration to use 1314 * @return A Table instance for the created table. 1315 * @throws IOException if getAdmin or createTable fails 1316 */ 1317 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1318 Configuration c) throws IOException { 1319 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1320 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1321 // on is interfering. 1322 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1323 } 1324 1325 /** 1326 * Create a table. 1327 * @param htd table descriptor 1328 * @param families array of column families 1329 * @param splitKeys array of split keys 1330 * @param type Bloom type 1331 * @param blockSize block size 1332 * @param c Configuration to use 1333 * @return A Table instance for the created table. 1334 * @throws IOException if getAdmin or createTable fails 1335 */ 1336 1337 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1338 BloomType type, int blockSize, Configuration c) throws IOException { 1339 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1340 for (byte[] family : families) { 1341 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1342 .setBloomFilterType(type).setBlocksize(blockSize); 1343 if (isNewVersionBehaviorEnabled()) { 1344 cfdb.setNewVersionBehavior(true); 1345 } 1346 builder.setColumnFamily(cfdb.build()); 1347 } 1348 TableDescriptor td = builder.build(); 1349 if (splitKeys != null) { 1350 getAdmin().createTable(td, splitKeys); 1351 } else { 1352 getAdmin().createTable(td); 1353 } 1354 // HBaseAdmin only waits for regions to appear in hbase:meta 1355 // we should wait until they are assigned 1356 waitUntilAllRegionsAssigned(td.getTableName()); 1357 return getConnection().getTable(td.getTableName()); 1358 } 1359 1360 /** 1361 * Create a table. 1362 * @param htd table descriptor 1363 * @param splitRows array of split keys 1364 * @return A Table instance for the created table. 1365 */ 1366 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1367 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1368 if (isNewVersionBehaviorEnabled()) { 1369 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1370 builder.setColumnFamily( 1371 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1372 } 1373 } 1374 if (splitRows != null) { 1375 getAdmin().createTable(builder.build(), splitRows); 1376 } else { 1377 getAdmin().createTable(builder.build()); 1378 } 1379 // HBaseAdmin only waits for regions to appear in hbase:meta 1380 // we should wait until they are assigned 1381 waitUntilAllRegionsAssigned(htd.getTableName()); 1382 return getConnection().getTable(htd.getTableName()); 1383 } 1384 1385 /** 1386 * Create a table. 1387 * @param tableName the table name 1388 * @param families the families 1389 * @param splitKeys the split keys 1390 * @param replicaCount the replica count 1391 * @param c Configuration to use 1392 * @return A Table instance for the created table. 1393 */ 1394 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1395 int replicaCount, final Configuration c) throws IOException { 1396 TableDescriptor htd = 1397 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1398 return createTable(htd, families, splitKeys, c); 1399 } 1400 1401 /** 1402 * Create a table. 1403 * @return A Table instance for the created table. 1404 */ 1405 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1406 return createTable(tableName, new byte[][] { family }, numVersions); 1407 } 1408 1409 /** 1410 * Create a table. 1411 * @return A Table instance for the created table. 1412 */ 1413 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1414 throws IOException { 1415 return createTable(tableName, families, numVersions, (byte[][]) null); 1416 } 1417 1418 /** 1419 * Create a table. 1420 * @return A Table instance for the created table. 1421 */ 1422 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1423 byte[][] splitKeys) throws IOException { 1424 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1425 for (byte[] family : families) { 1426 ColumnFamilyDescriptorBuilder cfBuilder = 1427 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1428 if (isNewVersionBehaviorEnabled()) { 1429 cfBuilder.setNewVersionBehavior(true); 1430 } 1431 builder.setColumnFamily(cfBuilder.build()); 1432 } 1433 if (splitKeys != null) { 1434 getAdmin().createTable(builder.build(), splitKeys); 1435 } else { 1436 getAdmin().createTable(builder.build()); 1437 } 1438 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1439 // assigned 1440 waitUntilAllRegionsAssigned(tableName); 1441 return getConnection().getTable(tableName); 1442 } 1443 1444 /** 1445 * Create a table with multiple regions. 1446 * @return A Table instance for the created table. 1447 */ 1448 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1449 throws IOException { 1450 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1451 } 1452 1453 /** 1454 * Create a table. 1455 * @return A Table instance for the created table. 1456 */ 1457 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1458 throws IOException { 1459 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1460 for (byte[] family : families) { 1461 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1462 .setMaxVersions(numVersions).setBlocksize(blockSize); 1463 if (isNewVersionBehaviorEnabled()) { 1464 cfBuilder.setNewVersionBehavior(true); 1465 } 1466 builder.setColumnFamily(cfBuilder.build()); 1467 } 1468 getAdmin().createTable(builder.build()); 1469 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1470 // assigned 1471 waitUntilAllRegionsAssigned(tableName); 1472 return getConnection().getTable(tableName); 1473 } 1474 1475 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1476 String cpName) throws IOException { 1477 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1478 for (byte[] family : families) { 1479 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1480 .setMaxVersions(numVersions).setBlocksize(blockSize); 1481 if (isNewVersionBehaviorEnabled()) { 1482 cfBuilder.setNewVersionBehavior(true); 1483 } 1484 builder.setColumnFamily(cfBuilder.build()); 1485 } 1486 if (cpName != null) { 1487 builder.setCoprocessor(cpName); 1488 } 1489 getAdmin().createTable(builder.build()); 1490 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1491 // assigned 1492 waitUntilAllRegionsAssigned(tableName); 1493 return getConnection().getTable(tableName); 1494 } 1495 1496 /** 1497 * Create a table. 1498 * @return A Table instance for the created table. 1499 */ 1500 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1501 throws IOException { 1502 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1503 int i = 0; 1504 for (byte[] family : families) { 1505 ColumnFamilyDescriptorBuilder cfBuilder = 1506 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1507 if (isNewVersionBehaviorEnabled()) { 1508 cfBuilder.setNewVersionBehavior(true); 1509 } 1510 builder.setColumnFamily(cfBuilder.build()); 1511 i++; 1512 } 1513 getAdmin().createTable(builder.build()); 1514 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1515 // assigned 1516 waitUntilAllRegionsAssigned(tableName); 1517 return getConnection().getTable(tableName); 1518 } 1519 1520 /** 1521 * Create a table. 1522 * @return A Table instance for the created table. 1523 */ 1524 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1525 throws IOException { 1526 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1527 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1528 if (isNewVersionBehaviorEnabled()) { 1529 cfBuilder.setNewVersionBehavior(true); 1530 } 1531 builder.setColumnFamily(cfBuilder.build()); 1532 getAdmin().createTable(builder.build(), splitRows); 1533 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1534 // assigned 1535 waitUntilAllRegionsAssigned(tableName); 1536 return getConnection().getTable(tableName); 1537 } 1538 1539 /** 1540 * Create a table with multiple regions. 1541 * @return A Table instance for the created table. 1542 */ 1543 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1544 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1545 } 1546 1547 /** 1548 * Set the number of Region replicas. 1549 */ 1550 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1551 throws IOException, InterruptedException { 1552 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1553 .setRegionReplication(replicaCount).build(); 1554 admin.modifyTable(desc); 1555 } 1556 1557 /** 1558 * Set the number of Region replicas. 1559 */ 1560 public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount) 1561 throws ExecutionException, IOException, InterruptedException { 1562 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get()) 1563 .setRegionReplication(replicaCount).build(); 1564 admin.modifyTable(desc).get(); 1565 } 1566 1567 /** 1568 * Drop an existing table 1569 * @param tableName existing table 1570 */ 1571 public void deleteTable(TableName tableName) throws IOException { 1572 try { 1573 getAdmin().disableTable(tableName); 1574 } catch (TableNotEnabledException e) { 1575 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1576 } 1577 getAdmin().deleteTable(tableName); 1578 } 1579 1580 /** 1581 * Drop an existing table 1582 * @param tableName existing table 1583 */ 1584 public void deleteTableIfAny(TableName tableName) throws IOException { 1585 try { 1586 deleteTable(tableName); 1587 } catch (TableNotFoundException e) { 1588 // ignore 1589 } 1590 } 1591 1592 // ========================================================================== 1593 // Canned table and table descriptor creation 1594 1595 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1596 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1597 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1598 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1599 private static final int MAXVERSIONS = 3; 1600 1601 public static final char FIRST_CHAR = 'a'; 1602 public static final char LAST_CHAR = 'z'; 1603 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1604 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1605 1606 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1607 return createModifyableTableDescriptor(TableName.valueOf(name), 1608 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1609 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1610 } 1611 1612 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1613 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1614 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1615 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1616 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1617 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1618 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1619 if (isNewVersionBehaviorEnabled()) { 1620 cfBuilder.setNewVersionBehavior(true); 1621 } 1622 builder.setColumnFamily(cfBuilder.build()); 1623 } 1624 return builder.build(); 1625 } 1626 1627 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1628 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1629 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1630 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1631 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1632 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1633 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1634 if (isNewVersionBehaviorEnabled()) { 1635 cfBuilder.setNewVersionBehavior(true); 1636 } 1637 builder.setColumnFamily(cfBuilder.build()); 1638 } 1639 return builder; 1640 } 1641 1642 /** 1643 * Create a table of name <code>name</code>. 1644 * @param name Name to give table. 1645 * @return Column descriptor. 1646 */ 1647 public TableDescriptor createTableDescriptor(final TableName name) { 1648 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1649 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1650 } 1651 1652 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1653 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1654 } 1655 1656 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1657 int maxVersions) { 1658 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1659 for (byte[] family : families) { 1660 ColumnFamilyDescriptorBuilder cfBuilder = 1661 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1662 if (isNewVersionBehaviorEnabled()) { 1663 cfBuilder.setNewVersionBehavior(true); 1664 } 1665 builder.setColumnFamily(cfBuilder.build()); 1666 } 1667 return builder.build(); 1668 } 1669 1670 /** 1671 * Create an HRegion that writes to the local tmp dirs 1672 * @param desc a table descriptor indicating which table the region belongs to 1673 * @param startKey the start boundary of the region 1674 * @param endKey the end boundary of the region 1675 * @return a region that writes to local dir for testing 1676 */ 1677 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1678 throws IOException { 1679 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1680 .setEndKey(endKey).build(); 1681 return createLocalHRegion(hri, desc); 1682 } 1683 1684 /** 1685 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1686 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it. 1687 */ 1688 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1689 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1690 } 1691 1692 /** 1693 * Create an HRegion that writes to the local tmp dirs with specified wal 1694 * @param info regioninfo 1695 * @param conf configuration 1696 * @param desc table descriptor 1697 * @param wal wal for this region. 1698 * @return created hregion 1699 */ 1700 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1701 WAL wal) throws IOException { 1702 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 1703 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1704 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1705 } 1706 1707 /** 1708 * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} 1709 * when done. 1710 */ 1711 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1712 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1713 throws IOException { 1714 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1715 durability, wal, null, families); 1716 } 1717 1718 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1719 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1720 boolean[] compactedMemStore, byte[]... families) throws IOException { 1721 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1722 builder.setReadOnly(isReadOnly); 1723 int i = 0; 1724 for (byte[] family : families) { 1725 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1726 if (compactedMemStore != null && i < compactedMemStore.length) { 1727 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1728 } else { 1729 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1730 1731 } 1732 i++; 1733 // Set default to be three versions. 1734 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1735 builder.setColumnFamily(cfBuilder.build()); 1736 } 1737 builder.setDurability(durability); 1738 RegionInfo info = 1739 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1740 return createLocalHRegion(info, conf, builder.build(), wal); 1741 } 1742 1743 // 1744 // ========================================================================== 1745 1746 /** 1747 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1748 * read. 1749 * @param tableName existing table 1750 * @return HTable to that new table 1751 */ 1752 public Table deleteTableData(TableName tableName) throws IOException { 1753 Table table = getConnection().getTable(tableName); 1754 Scan scan = new Scan(); 1755 ResultScanner resScan = table.getScanner(scan); 1756 for (Result res : resScan) { 1757 Delete del = new Delete(res.getRow()); 1758 table.delete(del); 1759 } 1760 resScan = table.getScanner(scan); 1761 resScan.close(); 1762 return table; 1763 } 1764 1765 /** 1766 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1767 * table. 1768 * @param tableName table which must exist. 1769 * @param preserveRegions keep the existing split points 1770 * @return HTable for the new table 1771 */ 1772 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 1773 throws IOException { 1774 Admin admin = getAdmin(); 1775 if (!admin.isTableDisabled(tableName)) { 1776 admin.disableTable(tableName); 1777 } 1778 admin.truncateTable(tableName, preserveRegions); 1779 return getConnection().getTable(tableName); 1780 } 1781 1782 /** 1783 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1784 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 1785 * preserve regions of existing table. 1786 * @param tableName table which must exist. 1787 * @return HTable for the new table 1788 */ 1789 public Table truncateTable(final TableName tableName) throws IOException { 1790 return truncateTable(tableName, false); 1791 } 1792 1793 /** 1794 * Load table with rows from 'aaa' to 'zzz'. 1795 * @param t Table 1796 * @param f Family 1797 * @return Count of rows loaded. 1798 */ 1799 public int loadTable(final Table t, final byte[] f) throws IOException { 1800 return loadTable(t, new byte[][] { f }); 1801 } 1802 1803 /** 1804 * Load table with rows from 'aaa' to 'zzz'. 1805 * @param t Table 1806 * @param f Family 1807 * @return Count of rows loaded. 1808 */ 1809 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 1810 return loadTable(t, new byte[][] { f }, null, writeToWAL); 1811 } 1812 1813 /** 1814 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1815 * @param t Table 1816 * @param f Array of Families to load 1817 * @return Count of rows loaded. 1818 */ 1819 public int loadTable(final Table t, final byte[][] f) throws IOException { 1820 return loadTable(t, f, null); 1821 } 1822 1823 /** 1824 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1825 * @param t Table 1826 * @param f Array of Families to load 1827 * @param value the values of the cells. If null is passed, the row key is used as value 1828 * @return Count of rows loaded. 1829 */ 1830 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 1831 return loadTable(t, f, value, true); 1832 } 1833 1834 /** 1835 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1836 * @param t Table 1837 * @param f Array of Families to load 1838 * @param value the values of the cells. If null is passed, the row key is used as value 1839 * @return Count of rows loaded. 1840 */ 1841 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 1842 throws IOException { 1843 List<Put> puts = new ArrayList<>(); 1844 for (byte[] row : HBaseTestingUtil.ROWS) { 1845 Put put = new Put(row); 1846 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 1847 for (int i = 0; i < f.length; i++) { 1848 byte[] value1 = value != null ? value : row; 1849 put.addColumn(f[i], f[i], value1); 1850 } 1851 puts.add(put); 1852 } 1853 t.put(puts); 1854 return puts.size(); 1855 } 1856 1857 /** 1858 * A tracker for tracking and validating table rows generated with 1859 * {@link HBaseTestingUtil#loadTable(Table, byte[])} 1860 */ 1861 public static class SeenRowTracker { 1862 int dim = 'z' - 'a' + 1; 1863 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 1864 byte[] startRow; 1865 byte[] stopRow; 1866 1867 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 1868 this.startRow = startRow; 1869 this.stopRow = stopRow; 1870 } 1871 1872 void reset() { 1873 for (byte[] row : ROWS) { 1874 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 1875 } 1876 } 1877 1878 int i(byte b) { 1879 return b - 'a'; 1880 } 1881 1882 public void addRow(byte[] row) { 1883 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 1884 } 1885 1886 /** 1887 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 1888 * rows none 1889 */ 1890 public void validate() { 1891 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1892 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1893 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1894 int count = seenRows[i(b1)][i(b2)][i(b3)]; 1895 int expectedCount = 0; 1896 if ( 1897 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 1898 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 1899 ) { 1900 expectedCount = 1; 1901 } 1902 if (count != expectedCount) { 1903 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 1904 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 1905 + "instead of " + expectedCount); 1906 } 1907 } 1908 } 1909 } 1910 } 1911 } 1912 1913 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 1914 return loadRegion(r, f, false); 1915 } 1916 1917 public int loadRegion(final Region r, final byte[] f) throws IOException { 1918 return loadRegion((HRegion) r, f); 1919 } 1920 1921 /** 1922 * Load region with rows from 'aaa' to 'zzz'. 1923 * @param r Region 1924 * @param f Family 1925 * @param flush flush the cache if true 1926 * @return Count of rows loaded. 1927 */ 1928 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 1929 byte[] k = new byte[3]; 1930 int rowCount = 0; 1931 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1932 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1933 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1934 k[0] = b1; 1935 k[1] = b2; 1936 k[2] = b3; 1937 Put put = new Put(k); 1938 put.setDurability(Durability.SKIP_WAL); 1939 put.addColumn(f, null, k); 1940 if (r.getWAL() == null) { 1941 put.setDurability(Durability.SKIP_WAL); 1942 } 1943 int preRowCount = rowCount; 1944 int pause = 10; 1945 int maxPause = 1000; 1946 while (rowCount == preRowCount) { 1947 try { 1948 r.put(put); 1949 rowCount++; 1950 } catch (RegionTooBusyException e) { 1951 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 1952 Threads.sleep(pause); 1953 } 1954 } 1955 } 1956 } 1957 if (flush) { 1958 r.flush(true); 1959 } 1960 } 1961 return rowCount; 1962 } 1963 1964 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 1965 throws IOException { 1966 for (int i = startRow; i < endRow; i++) { 1967 byte[] data = Bytes.toBytes(String.valueOf(i)); 1968 Put put = new Put(data); 1969 put.addColumn(f, null, data); 1970 t.put(put); 1971 } 1972 } 1973 1974 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 1975 throws IOException { 1976 for (int i = 0; i < totalRows; i++) { 1977 byte[] row = new byte[rowSize]; 1978 Bytes.random(row); 1979 Put put = new Put(row); 1980 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 1981 t.put(put); 1982 } 1983 } 1984 1985 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 1986 int replicaId) throws IOException { 1987 for (int i = startRow; i < endRow; i++) { 1988 String failMsg = "Failed verification of row :" + i; 1989 byte[] data = Bytes.toBytes(String.valueOf(i)); 1990 Get get = new Get(data); 1991 get.setReplicaId(replicaId); 1992 get.setConsistency(Consistency.TIMELINE); 1993 Result result = table.get(get); 1994 assertTrue(failMsg, result.containsColumn(f, null)); 1995 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 1996 Cell cell = result.getColumnLatestCell(f, null); 1997 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 1998 cell.getValueOffset(), cell.getValueLength())); 1999 } 2000 } 2001 2002 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 2003 throws IOException { 2004 verifyNumericRows((HRegion) region, f, startRow, endRow); 2005 } 2006 2007 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 2008 throws IOException { 2009 verifyNumericRows(region, f, startRow, endRow, true); 2010 } 2011 2012 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 2013 final boolean present) throws IOException { 2014 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 2015 } 2016 2017 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 2018 final boolean present) throws IOException { 2019 for (int i = startRow; i < endRow; i++) { 2020 String failMsg = "Failed verification of row :" + i; 2021 byte[] data = Bytes.toBytes(String.valueOf(i)); 2022 Result result = region.get(new Get(data)); 2023 2024 boolean hasResult = result != null && !result.isEmpty(); 2025 assertEquals(failMsg + result, present, hasResult); 2026 if (!present) continue; 2027 2028 assertTrue(failMsg, result.containsColumn(f, null)); 2029 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2030 Cell cell = result.getColumnLatestCell(f, null); 2031 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2032 cell.getValueOffset(), cell.getValueLength())); 2033 } 2034 } 2035 2036 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2037 throws IOException { 2038 for (int i = startRow; i < endRow; i++) { 2039 byte[] data = Bytes.toBytes(String.valueOf(i)); 2040 Delete delete = new Delete(data); 2041 delete.addFamily(f); 2042 t.delete(delete); 2043 } 2044 } 2045 2046 /** 2047 * Return the number of rows in the given table. 2048 * @param table to count rows 2049 * @return count of rows 2050 */ 2051 public static int countRows(final Table table) throws IOException { 2052 return countRows(table, new Scan()); 2053 } 2054 2055 public static int countRows(final Table table, final Scan scan) throws IOException { 2056 try (ResultScanner results = table.getScanner(scan)) { 2057 int count = 0; 2058 while (results.next() != null) { 2059 count++; 2060 } 2061 return count; 2062 } 2063 } 2064 2065 public static int countRows(final Table table, final byte[]... families) throws IOException { 2066 Scan scan = new Scan(); 2067 for (byte[] family : families) { 2068 scan.addFamily(family); 2069 } 2070 return countRows(table, scan); 2071 } 2072 2073 /** 2074 * Return the number of rows in the given table. 2075 */ 2076 public int countRows(final TableName tableName) throws IOException { 2077 try (Table table = getConnection().getTable(tableName)) { 2078 return countRows(table); 2079 } 2080 } 2081 2082 public static int countRows(final Region region) throws IOException { 2083 return countRows(region, new Scan()); 2084 } 2085 2086 public static int countRows(final Region region, final Scan scan) throws IOException { 2087 try (InternalScanner scanner = region.getScanner(scan)) { 2088 return countRows(scanner); 2089 } 2090 } 2091 2092 public static int countRows(final InternalScanner scanner) throws IOException { 2093 int scannedCount = 0; 2094 List<Cell> results = new ArrayList<>(); 2095 boolean hasMore = true; 2096 while (hasMore) { 2097 hasMore = scanner.next(results); 2098 scannedCount += results.size(); 2099 results.clear(); 2100 } 2101 return scannedCount; 2102 } 2103 2104 /** 2105 * Return an md5 digest of the entire contents of a table. 2106 */ 2107 public String checksumRows(final Table table) throws Exception { 2108 MessageDigest digest = MessageDigest.getInstance("MD5"); 2109 try (ResultScanner results = table.getScanner(new Scan())) { 2110 for (Result res : results) { 2111 digest.update(res.getRow()); 2112 } 2113 } 2114 return digest.toString(); 2115 } 2116 2117 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2118 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2119 static { 2120 int i = 0; 2121 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2122 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2123 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2124 ROWS[i][0] = b1; 2125 ROWS[i][1] = b2; 2126 ROWS[i][2] = b3; 2127 i++; 2128 } 2129 } 2130 } 2131 } 2132 2133 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2134 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2135 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2136 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2137 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2138 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2139 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2140 2141 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2142 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2143 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2144 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2145 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2146 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2147 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2148 2149 /** 2150 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2151 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2152 * @return list of region info for regions added to meta 2153 */ 2154 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2155 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2156 try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 2157 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2158 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2159 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2160 TableState.State.ENABLED); 2161 // add custom ones 2162 for (int i = 0; i < startKeys.length; i++) { 2163 int j = (i + 1) % startKeys.length; 2164 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2165 .setEndKey(startKeys[j]).build(); 2166 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2167 newRegions.add(hri); 2168 } 2169 return newRegions; 2170 } 2171 } 2172 2173 /** 2174 * Create an unmanaged WAL. Be sure to close it when you're through. 2175 */ 2176 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2177 throws IOException { 2178 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2179 // unless I pass along via the conf. 2180 Configuration confForWAL = new Configuration(conf); 2181 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 2182 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri); 2183 } 2184 2185 /** 2186 * Create a region with it's own WAL. Be sure to call 2187 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2188 */ 2189 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2190 final Configuration conf, final TableDescriptor htd) throws IOException { 2191 return createRegionAndWAL(info, rootDir, conf, htd, true); 2192 } 2193 2194 /** 2195 * Create a region with it's own WAL. Be sure to call 2196 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2197 */ 2198 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2199 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2200 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2201 region.setBlockCache(blockCache); 2202 region.initialize(); 2203 return region; 2204 } 2205 2206 /** 2207 * Create a region with it's own WAL. Be sure to call 2208 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2209 */ 2210 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2211 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2212 throws IOException { 2213 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2214 region.setMobFileCache(mobFileCache); 2215 region.initialize(); 2216 return region; 2217 } 2218 2219 /** 2220 * Create a region with it's own WAL. Be sure to call 2221 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2222 */ 2223 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2224 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2225 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2226 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2227 WAL wal = createWal(conf, rootDir, info); 2228 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2229 } 2230 2231 /** 2232 * Find any other region server which is different from the one identified by parameter 2233 * @return another region server 2234 */ 2235 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2236 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2237 if (!(rst.getRegionServer() == rs)) { 2238 return rst.getRegionServer(); 2239 } 2240 } 2241 return null; 2242 } 2243 2244 /** 2245 * Tool to get the reference to the region server object that holds the region of the specified 2246 * user table. 2247 * @param tableName user table to lookup in hbase:meta 2248 * @return region server that holds it, null if the row doesn't exist 2249 */ 2250 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2251 throws IOException, InterruptedException { 2252 List<RegionInfo> regions = getAdmin().getRegions(tableName); 2253 if (regions == null || regions.isEmpty()) { 2254 return null; 2255 } 2256 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2257 2258 byte[] firstRegionName = 2259 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2260 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2261 2262 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2263 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2264 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2265 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2266 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2267 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2268 while (retrier.shouldRetry()) { 2269 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2270 if (index != -1) { 2271 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2272 } 2273 // Came back -1. Region may not be online yet. Sleep a while. 2274 retrier.sleepUntilNextRetry(); 2275 } 2276 return null; 2277 } 2278 2279 /** 2280 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2281 * @throws IOException When starting the cluster fails. 2282 */ 2283 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2284 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2285 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2286 "99.0"); 2287 startMiniMapReduceCluster(2); 2288 return mrCluster; 2289 } 2290 2291 /** 2292 * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its 2293 * internal static LOG_DIR variable. 2294 */ 2295 private void forceChangeTaskLogDir() { 2296 Field logDirField; 2297 try { 2298 logDirField = TaskLog.class.getDeclaredField("LOG_DIR"); 2299 logDirField.setAccessible(true); 2300 2301 Field modifiersField = ReflectionUtils.getModifiersField(); 2302 modifiersField.setAccessible(true); 2303 modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL); 2304 2305 logDirField.set(null, new File(hadoopLogDir, "userlogs")); 2306 } catch (SecurityException e) { 2307 throw new RuntimeException(e); 2308 } catch (NoSuchFieldException e) { 2309 throw new RuntimeException(e); 2310 } catch (IllegalArgumentException e) { 2311 throw new RuntimeException(e); 2312 } catch (IllegalAccessException e) { 2313 throw new RuntimeException(e); 2314 } 2315 } 2316 2317 /** 2318 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2319 * filesystem. 2320 * @param servers The number of <code>TaskTracker</code>'s to start. 2321 * @throws IOException When starting the cluster fails. 2322 */ 2323 private void startMiniMapReduceCluster(final int servers) throws IOException { 2324 if (mrCluster != null) { 2325 throw new IllegalStateException("MiniMRCluster is already running"); 2326 } 2327 LOG.info("Starting mini mapreduce cluster..."); 2328 setupClusterTestDir(); 2329 createDirsAndSetProperties(); 2330 2331 forceChangeTaskLogDir(); 2332 2333 //// hadoop2 specific settings 2334 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2335 // we up the VM usable so that processes don't get killed. 2336 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2337 2338 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2339 // this avoids the problem by disabling speculative task execution in tests. 2340 conf.setBoolean("mapreduce.map.speculative", false); 2341 conf.setBoolean("mapreduce.reduce.speculative", false); 2342 //// 2343 2344 // Yarn container runs in independent JVM. We need to pass the argument manually here if the 2345 // JDK version >= 17. Otherwise, the MiniMRCluster will fail. 2346 if (JVM.getJVMSpecVersion() >= 17) { 2347 String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", ""); 2348 conf.set("yarn.app.mapreduce.am.command-opts", 2349 jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED"); 2350 } 2351 2352 // Allow the user to override FS URI for this map-reduce cluster to use. 2353 mrCluster = 2354 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2355 1, null, null, new JobConf(this.conf)); 2356 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2357 if (jobConf == null) { 2358 jobConf = mrCluster.createJobConf(); 2359 } 2360 2361 // Hadoop MiniMR overwrites this while it should not 2362 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2363 LOG.info("Mini mapreduce cluster started"); 2364 2365 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2366 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2367 // necessary config properties here. YARN-129 required adding a few properties. 2368 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2369 // this for mrv2 support; mr1 ignores this 2370 conf.set("mapreduce.framework.name", "yarn"); 2371 conf.setBoolean("yarn.is.minicluster", true); 2372 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2373 if (rmAddress != null) { 2374 conf.set("yarn.resourcemanager.address", rmAddress); 2375 } 2376 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2377 if (historyAddress != null) { 2378 conf.set("mapreduce.jobhistory.address", historyAddress); 2379 } 2380 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2381 if (schedulerAddress != null) { 2382 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2383 } 2384 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2385 if (mrJobHistoryWebappAddress != null) { 2386 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2387 } 2388 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2389 if (yarnRMWebappAddress != null) { 2390 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2391 } 2392 } 2393 2394 /** 2395 * Stops the previously started <code>MiniMRCluster</code>. 2396 */ 2397 public void shutdownMiniMapReduceCluster() { 2398 if (mrCluster != null) { 2399 LOG.info("Stopping mini mapreduce cluster..."); 2400 mrCluster.shutdown(); 2401 mrCluster = null; 2402 LOG.info("Mini mapreduce cluster stopped"); 2403 } 2404 // Restore configuration to point to local jobtracker 2405 conf.set("mapreduce.jobtracker.address", "local"); 2406 } 2407 2408 /** 2409 * Create a stubbed out RegionServerService, mainly for getting FS. 2410 */ 2411 public RegionServerServices createMockRegionServerService() throws IOException { 2412 return createMockRegionServerService((ServerName) null); 2413 } 2414 2415 /** 2416 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2417 * TestTokenAuthentication 2418 */ 2419 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2420 throws IOException { 2421 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2422 rss.setFileSystem(getTestFileSystem()); 2423 rss.setRpcServer(rpc); 2424 return rss; 2425 } 2426 2427 /** 2428 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2429 * TestOpenRegionHandler 2430 */ 2431 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2432 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2433 rss.setFileSystem(getTestFileSystem()); 2434 return rss; 2435 } 2436 2437 /** 2438 * Expire the Master's session 2439 */ 2440 public void expireMasterSession() throws Exception { 2441 HMaster master = getMiniHBaseCluster().getMaster(); 2442 expireSession(master.getZooKeeper(), false); 2443 } 2444 2445 /** 2446 * Expire a region server's session 2447 * @param index which RS 2448 */ 2449 public void expireRegionServerSession(int index) throws Exception { 2450 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2451 expireSession(rs.getZooKeeper(), false); 2452 decrementMinRegionServerCount(); 2453 } 2454 2455 private void decrementMinRegionServerCount() { 2456 // decrement the count for this.conf, for newly spwaned master 2457 // this.hbaseCluster shares this configuration too 2458 decrementMinRegionServerCount(getConfiguration()); 2459 2460 // each master thread keeps a copy of configuration 2461 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2462 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2463 } 2464 } 2465 2466 private void decrementMinRegionServerCount(Configuration conf) { 2467 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2468 if (currentCount != -1) { 2469 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2470 } 2471 } 2472 2473 public void expireSession(ZKWatcher nodeZK) throws Exception { 2474 expireSession(nodeZK, false); 2475 } 2476 2477 /** 2478 * Expire a ZooKeeper session as recommended in ZooKeeper documentation 2479 * http://hbase.apache.org/book.html#trouble.zookeeper 2480 * <p/> 2481 * There are issues when doing this: 2482 * <ol> 2483 * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li> 2484 * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li> 2485 * </ol> 2486 * @param nodeZK - the ZK watcher to expire 2487 * @param checkStatus - true to check if we can create a Table with the current configuration. 2488 */ 2489 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2490 Configuration c = new Configuration(this.conf); 2491 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2492 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2493 byte[] password = zk.getSessionPasswd(); 2494 long sessionID = zk.getSessionId(); 2495 2496 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2497 // so we create a first watcher to be sure that the 2498 // event was sent. We expect that if our watcher receives the event 2499 // other watchers on the same machine will get is as well. 2500 // When we ask to close the connection, ZK does not close it before 2501 // we receive all the events, so don't have to capture the event, just 2502 // closing the connection should be enough. 2503 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2504 @Override 2505 public void process(WatchedEvent watchedEvent) { 2506 LOG.info("Monitor ZKW received event=" + watchedEvent); 2507 } 2508 }, sessionID, password); 2509 2510 // Making it expire 2511 ZooKeeper newZK = 2512 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2513 2514 // ensure that we have connection to the server before closing down, otherwise 2515 // the close session event will be eaten out before we start CONNECTING state 2516 long start = EnvironmentEdgeManager.currentTime(); 2517 while ( 2518 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2519 ) { 2520 Thread.sleep(1); 2521 } 2522 newZK.close(); 2523 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2524 2525 // Now closing & waiting to be sure that the clients get it. 2526 monitor.close(); 2527 2528 if (checkStatus) { 2529 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2530 } 2531 } 2532 2533 /** 2534 * Get the Mini HBase cluster. 2535 * @return hbase cluster 2536 * @see #getHBaseClusterInterface() 2537 */ 2538 public SingleProcessHBaseCluster getHBaseCluster() { 2539 return getMiniHBaseCluster(); 2540 } 2541 2542 /** 2543 * Returns the HBaseCluster instance. 2544 * <p> 2545 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2546 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2547 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2548 * instead w/o the need to type-cast. 2549 */ 2550 public HBaseClusterInterface getHBaseClusterInterface() { 2551 // implementation note: we should rename this method as #getHBaseCluster(), 2552 // but this would require refactoring 90+ calls. 2553 return hbaseCluster; 2554 } 2555 2556 /** 2557 * Resets the connections so that the next time getConnection() is called, a new connection is 2558 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2559 * the connection is not valid anymore. 2560 * <p/> 2561 * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are 2562 * written, not all start() stop() calls go through this class. Most tests directly operate on the 2563 * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain 2564 * the connection state automatically. Cleaning this is a much bigger refactor. 2565 */ 2566 public void invalidateConnection() throws IOException { 2567 closeConnection(); 2568 // Update the master addresses if they changed. 2569 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2570 final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY); 2571 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2572 masterConfigBefore, masterConfAfter); 2573 conf.set(HConstants.MASTER_ADDRS_KEY, 2574 getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY)); 2575 } 2576 2577 /** 2578 * Get a shared Connection to the cluster. this method is thread safe. 2579 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2580 */ 2581 public Connection getConnection() throws IOException { 2582 return getAsyncConnection().toConnection(); 2583 } 2584 2585 /** 2586 * Get a assigned Connection to the cluster. this method is thread safe. 2587 * @param user assigned user 2588 * @return A Connection with assigned user. 2589 */ 2590 public Connection getConnection(User user) throws IOException { 2591 return getAsyncConnection(user).toConnection(); 2592 } 2593 2594 /** 2595 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2596 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2597 * of cluster. 2598 */ 2599 public AsyncClusterConnection getAsyncConnection() throws IOException { 2600 try { 2601 return asyncConnection.updateAndGet(connection -> { 2602 if (connection == null) { 2603 try { 2604 User user = UserProvider.instantiate(conf).getCurrent(); 2605 connection = getAsyncConnection(user); 2606 } catch (IOException ioe) { 2607 throw new UncheckedIOException("Failed to create connection", ioe); 2608 } 2609 } 2610 return connection; 2611 }); 2612 } catch (UncheckedIOException exception) { 2613 throw exception.getCause(); 2614 } 2615 } 2616 2617 /** 2618 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2619 * @param user assigned user 2620 * @return An AsyncClusterConnection with assigned user. 2621 */ 2622 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2623 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2624 } 2625 2626 public void closeConnection() throws IOException { 2627 if (hbaseAdmin != null) { 2628 Closeables.close(hbaseAdmin, true); 2629 hbaseAdmin = null; 2630 } 2631 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2632 if (asyncConnection != null) { 2633 Closeables.close(asyncConnection, true); 2634 } 2635 } 2636 2637 /** 2638 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2639 * it has no effect, it will be closed automatically when the cluster shutdowns 2640 */ 2641 public Admin getAdmin() throws IOException { 2642 if (hbaseAdmin == null) { 2643 this.hbaseAdmin = getConnection().getAdmin(); 2644 } 2645 return hbaseAdmin; 2646 } 2647 2648 private Admin hbaseAdmin = null; 2649 2650 /** 2651 * Returns an {@link Hbck} instance. Needs be closed when done. 2652 */ 2653 public Hbck getHbck() throws IOException { 2654 return getConnection().getHbck(); 2655 } 2656 2657 /** 2658 * Unassign the named region. 2659 * @param regionName The region to unassign. 2660 */ 2661 public void unassignRegion(String regionName) throws IOException { 2662 unassignRegion(Bytes.toBytes(regionName)); 2663 } 2664 2665 /** 2666 * Unassign the named region. 2667 * @param regionName The region to unassign. 2668 */ 2669 public void unassignRegion(byte[] regionName) throws IOException { 2670 getAdmin().unassign(regionName); 2671 } 2672 2673 /** 2674 * Closes the region containing the given row. 2675 * @param row The row to find the containing region. 2676 * @param table The table to find the region. 2677 */ 2678 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 2679 unassignRegionByRow(Bytes.toBytes(row), table); 2680 } 2681 2682 /** 2683 * Closes the region containing the given row. 2684 * @param row The row to find the containing region. 2685 * @param table The table to find the region. 2686 */ 2687 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 2688 HRegionLocation hrl = table.getRegionLocation(row); 2689 unassignRegion(hrl.getRegion().getRegionName()); 2690 } 2691 2692 /** 2693 * Retrieves a splittable region randomly from tableName 2694 * @param tableName name of table 2695 * @param maxAttempts maximum number of attempts, unlimited for value of -1 2696 * @return the HRegion chosen, null if none was found within limit of maxAttempts 2697 */ 2698 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 2699 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 2700 int regCount = regions.size(); 2701 Set<Integer> attempted = new HashSet<>(); 2702 int idx; 2703 int attempts = 0; 2704 do { 2705 regions = getHBaseCluster().getRegions(tableName); 2706 if (regCount != regions.size()) { 2707 // if there was region movement, clear attempted Set 2708 attempted.clear(); 2709 } 2710 regCount = regions.size(); 2711 // There are chances that before we get the region for the table from an RS the region may 2712 // be going for CLOSE. This may be because online schema change is enabled 2713 if (regCount > 0) { 2714 idx = ThreadLocalRandom.current().nextInt(regCount); 2715 // if we have just tried this region, there is no need to try again 2716 if (attempted.contains(idx)) { 2717 continue; 2718 } 2719 HRegion region = regions.get(idx); 2720 if (region.checkSplit().isPresent()) { 2721 return region; 2722 } 2723 attempted.add(idx); 2724 } 2725 attempts++; 2726 } while (maxAttempts == -1 || attempts < maxAttempts); 2727 return null; 2728 } 2729 2730 public MiniDFSCluster getDFSCluster() { 2731 return dfsCluster; 2732 } 2733 2734 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 2735 setDFSCluster(cluster, true); 2736 } 2737 2738 /** 2739 * Set the MiniDFSCluster 2740 * @param cluster cluster to use 2741 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 2742 * is set. 2743 * @throws IllegalStateException if the passed cluster is up when it is required to be down 2744 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 2745 */ 2746 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 2747 throws IllegalStateException, IOException { 2748 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 2749 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 2750 } 2751 this.dfsCluster = cluster; 2752 this.setFs(); 2753 } 2754 2755 public FileSystem getTestFileSystem() throws IOException { 2756 return HFileSystem.get(conf); 2757 } 2758 2759 /** 2760 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 2761 * (30 seconds). 2762 * @param table Table to wait on. 2763 */ 2764 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 2765 waitTableAvailable(table.getName(), 30000); 2766 } 2767 2768 public void waitTableAvailable(TableName table, long timeoutMillis) 2769 throws InterruptedException, IOException { 2770 waitFor(timeoutMillis, predicateTableAvailable(table)); 2771 } 2772 2773 /** 2774 * Wait until all regions in a table have been assigned 2775 * @param table Table to wait on. 2776 * @param timeoutMillis Timeout. 2777 */ 2778 public void waitTableAvailable(byte[] table, long timeoutMillis) 2779 throws InterruptedException, IOException { 2780 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 2781 } 2782 2783 public String explainTableAvailability(TableName tableName) throws IOException { 2784 StringBuilder msg = 2785 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 2786 if (getHBaseCluster().getMaster().isAlive()) { 2787 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 2788 .getRegionStates().getRegionAssignments(); 2789 final List<Pair<RegionInfo, ServerName>> metaLocations = 2790 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 2791 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 2792 RegionInfo hri = metaLocation.getFirst(); 2793 ServerName sn = metaLocation.getSecond(); 2794 if (!assignments.containsKey(hri)) { 2795 msg.append(", region ").append(hri) 2796 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 2797 } else if (sn == null) { 2798 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 2799 } else if (!sn.equals(assignments.get(hri))) { 2800 msg.append(", region ").append(hri) 2801 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 2802 .append(" <> ").append(assignments.get(hri)); 2803 } 2804 } 2805 } 2806 return msg.toString(); 2807 } 2808 2809 public String explainTableState(final TableName table, TableState.State state) 2810 throws IOException { 2811 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 2812 if (tableState == null) { 2813 return "TableState in META: No table state in META for table " + table 2814 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 2815 } else if (!tableState.inStates(state)) { 2816 return "TableState in META: Not " + state + " state, but " + tableState; 2817 } else { 2818 return "TableState in META: OK"; 2819 } 2820 } 2821 2822 @Nullable 2823 public TableState findLastTableState(final TableName table) throws IOException { 2824 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 2825 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 2826 @Override 2827 public boolean visit(Result r) throws IOException { 2828 if (!Arrays.equals(r.getRow(), table.getName())) { 2829 return false; 2830 } 2831 TableState state = CatalogFamilyFormat.getTableState(r); 2832 if (state != null) { 2833 lastTableState.set(state); 2834 } 2835 return true; 2836 } 2837 }; 2838 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 2839 Integer.MAX_VALUE, visitor); 2840 return lastTableState.get(); 2841 } 2842 2843 /** 2844 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2845 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 2846 * table. 2847 * @param table the table to wait on. 2848 * @throws InterruptedException if interrupted while waiting 2849 * @throws IOException if an IO problem is encountered 2850 */ 2851 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 2852 waitTableEnabled(table, 30000); 2853 } 2854 2855 /** 2856 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2857 * have been all assigned. 2858 * @see #waitTableEnabled(TableName, long) 2859 * @param table Table to wait on. 2860 * @param timeoutMillis Time to wait on it being marked enabled. 2861 */ 2862 public void waitTableEnabled(byte[] table, long timeoutMillis) 2863 throws InterruptedException, IOException { 2864 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 2865 } 2866 2867 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 2868 waitFor(timeoutMillis, predicateTableEnabled(table)); 2869 } 2870 2871 /** 2872 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 2873 * after default period (30 seconds) 2874 * @param table Table to wait on. 2875 */ 2876 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 2877 waitTableDisabled(table, 30000); 2878 } 2879 2880 public void waitTableDisabled(TableName table, long millisTimeout) 2881 throws InterruptedException, IOException { 2882 waitFor(millisTimeout, predicateTableDisabled(table)); 2883 } 2884 2885 /** 2886 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 2887 * @param table Table to wait on. 2888 * @param timeoutMillis Time to wait on it being marked disabled. 2889 */ 2890 public void waitTableDisabled(byte[] table, long timeoutMillis) 2891 throws InterruptedException, IOException { 2892 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 2893 } 2894 2895 /** 2896 * Make sure that at least the specified number of region servers are running 2897 * @param num minimum number of region servers that should be running 2898 * @return true if we started some servers 2899 */ 2900 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 2901 boolean startedServer = false; 2902 SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster(); 2903 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 2904 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 2905 startedServer = true; 2906 } 2907 2908 return startedServer; 2909 } 2910 2911 /** 2912 * Make sure that at least the specified number of region servers are running. We don't count the 2913 * ones that are currently stopping or are stopped. 2914 * @param num minimum number of region servers that should be running 2915 * @return true if we started some servers 2916 */ 2917 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 2918 boolean startedServer = ensureSomeRegionServersAvailable(num); 2919 2920 int nonStoppedServers = 0; 2921 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2922 2923 HRegionServer hrs = rst.getRegionServer(); 2924 if (hrs.isStopping() || hrs.isStopped()) { 2925 LOG.info("A region server is stopped or stopping:" + hrs); 2926 } else { 2927 nonStoppedServers++; 2928 } 2929 } 2930 for (int i = nonStoppedServers; i < num; ++i) { 2931 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 2932 startedServer = true; 2933 } 2934 return startedServer; 2935 } 2936 2937 /** 2938 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 2939 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 2940 * @param c Initial configuration 2941 * @param differentiatingSuffix Suffix to differentiate this user from others. 2942 * @return A new configuration instance with a different user set into it. 2943 */ 2944 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 2945 throws IOException { 2946 FileSystem currentfs = FileSystem.get(c); 2947 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 2948 return User.getCurrent(); 2949 } 2950 // Else distributed filesystem. Make a new instance per daemon. Below 2951 // code is taken from the AppendTestUtil over in hdfs. 2952 String username = User.getCurrent().getName() + differentiatingSuffix; 2953 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 2954 return user; 2955 } 2956 2957 public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster) 2958 throws IOException { 2959 NavigableSet<String> online = new TreeSet<>(); 2960 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 2961 try { 2962 for (RegionInfo region : ProtobufUtil 2963 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 2964 online.add(region.getRegionNameAsString()); 2965 } 2966 } catch (RegionServerStoppedException e) { 2967 // That's fine. 2968 } 2969 } 2970 return online; 2971 } 2972 2973 /** 2974 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 2975 * linger. Here is the exception you'll see: 2976 * 2977 * <pre> 2978 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 2979 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 2980 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 2981 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 2982 * </pre> 2983 * 2984 * @param stream A DFSClient.DFSOutputStream. 2985 */ 2986 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 2987 try { 2988 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 2989 for (Class<?> clazz : clazzes) { 2990 String className = clazz.getSimpleName(); 2991 if (className.equals("DFSOutputStream")) { 2992 if (clazz.isInstance(stream)) { 2993 Field maxRecoveryErrorCountField = 2994 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 2995 maxRecoveryErrorCountField.setAccessible(true); 2996 maxRecoveryErrorCountField.setInt(stream, max); 2997 break; 2998 } 2999 } 3000 } 3001 } catch (Exception e) { 3002 LOG.info("Could not set max recovery field", e); 3003 } 3004 } 3005 3006 /** 3007 * Uses directly the assignment manager to assign the region. and waits until the specified region 3008 * has completed assignment. 3009 * @return true if the region is assigned false otherwise. 3010 */ 3011 public boolean assignRegion(final RegionInfo regionInfo) 3012 throws IOException, InterruptedException { 3013 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 3014 am.assign(regionInfo); 3015 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 3016 } 3017 3018 /** 3019 * Move region to destination server and wait till region is completely moved and online 3020 * @param destRegion region to move 3021 * @param destServer destination server of the region 3022 */ 3023 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3024 throws InterruptedException, IOException { 3025 HMaster master = getMiniHBaseCluster().getMaster(); 3026 // TODO: Here we start the move. The move can take a while. 3027 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3028 while (true) { 3029 ServerName serverName = 3030 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3031 if (serverName != null && serverName.equals(destServer)) { 3032 assertRegionOnServer(destRegion, serverName, 2000); 3033 break; 3034 } 3035 Thread.sleep(10); 3036 } 3037 } 3038 3039 /** 3040 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3041 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3042 * master has been informed and updated hbase:meta with the regions deployed server. 3043 * @param tableName the table name 3044 */ 3045 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3046 waitUntilAllRegionsAssigned(tableName, 3047 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3048 } 3049 3050 /** 3051 * Waith until all system table's regions get assigned 3052 */ 3053 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3054 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3055 } 3056 3057 /** 3058 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3059 * timeout. This means all regions have been deployed, master has been informed and updated 3060 * hbase:meta with the regions deployed server. 3061 * @param tableName the table name 3062 * @param timeout timeout, in milliseconds 3063 */ 3064 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3065 throws IOException { 3066 if (!TableName.isMetaTableName(tableName)) { 3067 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3068 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3069 + timeout + "ms"); 3070 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3071 @Override 3072 public String explainFailure() throws IOException { 3073 return explainTableAvailability(tableName); 3074 } 3075 3076 @Override 3077 public boolean evaluate() throws IOException { 3078 Scan scan = new Scan(); 3079 scan.addFamily(HConstants.CATALOG_FAMILY); 3080 boolean tableFound = false; 3081 try (ResultScanner s = meta.getScanner(scan)) { 3082 for (Result r; (r = s.next()) != null;) { 3083 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3084 RegionInfo info = RegionInfo.parseFromOrNull(b); 3085 if (info != null && info.getTable().equals(tableName)) { 3086 // Get server hosting this region from catalog family. Return false if no server 3087 // hosting this region, or if the server hosting this region was recently killed 3088 // (for fault tolerance testing). 3089 tableFound = true; 3090 byte[] server = 3091 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3092 if (server == null) { 3093 return false; 3094 } else { 3095 byte[] startCode = 3096 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3097 ServerName serverName = 3098 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3099 + Bytes.toLong(startCode)); 3100 if ( 3101 !getHBaseClusterInterface().isDistributedCluster() 3102 && getHBaseCluster().isKilledRS(serverName) 3103 ) { 3104 return false; 3105 } 3106 } 3107 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3108 return false; 3109 } 3110 } 3111 } 3112 } 3113 if (!tableFound) { 3114 LOG.warn( 3115 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3116 } 3117 return tableFound; 3118 } 3119 }); 3120 } 3121 } 3122 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3123 // check from the master state if we are using a mini cluster 3124 if (!getHBaseClusterInterface().isDistributedCluster()) { 3125 // So, all regions are in the meta table but make sure master knows of the assignments before 3126 // returning -- sometimes this can lag. 3127 HMaster master = getHBaseCluster().getMaster(); 3128 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3129 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3130 @Override 3131 public String explainFailure() throws IOException { 3132 return explainTableAvailability(tableName); 3133 } 3134 3135 @Override 3136 public boolean evaluate() throws IOException { 3137 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3138 return hris != null && !hris.isEmpty(); 3139 } 3140 }); 3141 } 3142 LOG.info("All regions for table " + tableName + " assigned."); 3143 } 3144 3145 /** 3146 * Do a small get/scan against one store. This is required because store has no actual methods of 3147 * querying itself, and relies on StoreScanner. 3148 */ 3149 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3150 Scan scan = new Scan(get); 3151 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3152 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3153 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3154 // readpoint 0. 3155 0); 3156 3157 List<Cell> result = new ArrayList<>(); 3158 scanner.next(result); 3159 if (!result.isEmpty()) { 3160 // verify that we are on the row we want: 3161 Cell kv = result.get(0); 3162 if (!CellUtil.matchingRows(kv, get.getRow())) { 3163 result.clear(); 3164 } 3165 } 3166 scanner.close(); 3167 return result; 3168 } 3169 3170 /** 3171 * Create region split keys between startkey and endKey 3172 * @param numRegions the number of regions to be created. it has to be greater than 3. 3173 * @return resulting split keys 3174 */ 3175 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3176 assertTrue(numRegions > 3); 3177 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3178 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3179 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3180 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3181 return result; 3182 } 3183 3184 /** 3185 * Do a small get/scan against one store. This is required because store has no actual methods of 3186 * querying itself, and relies on StoreScanner. 3187 */ 3188 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3189 throws IOException { 3190 Get get = new Get(row); 3191 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3192 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3193 3194 return getFromStoreFile(store, get); 3195 } 3196 3197 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3198 final List<? extends Cell> actual) { 3199 final int eLen = expected.size(); 3200 final int aLen = actual.size(); 3201 final int minLen = Math.min(eLen, aLen); 3202 3203 int i = 0; 3204 while ( 3205 i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0 3206 ) { 3207 i++; 3208 } 3209 3210 if (additionalMsg == null) { 3211 additionalMsg = ""; 3212 } 3213 if (!additionalMsg.isEmpty()) { 3214 additionalMsg = ". " + additionalMsg; 3215 } 3216 3217 if (eLen != aLen || i != minLen) { 3218 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3219 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3220 + " (length " + aLen + ")" + additionalMsg); 3221 } 3222 } 3223 3224 public static <T> String safeGetAsStr(List<T> lst, int i) { 3225 if (0 <= i && i < lst.size()) { 3226 return lst.get(i).toString(); 3227 } else { 3228 return "<out_of_range>"; 3229 } 3230 } 3231 3232 public String getClusterKey() { 3233 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3234 + ":" 3235 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3236 } 3237 3238 /** 3239 * Creates a random table with the given parameters 3240 */ 3241 public Table createRandomTable(TableName tableName, final Collection<String> families, 3242 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3243 final int numRowsPerFlush) throws IOException, InterruptedException { 3244 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3245 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3246 + maxVersions + "\n"); 3247 3248 final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L); 3249 final int numCF = families.size(); 3250 final byte[][] cfBytes = new byte[numCF][]; 3251 { 3252 int cfIndex = 0; 3253 for (String cf : families) { 3254 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3255 } 3256 } 3257 3258 final int actualStartKey = 0; 3259 final int actualEndKey = Integer.MAX_VALUE; 3260 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3261 final int splitStartKey = actualStartKey + keysPerRegion; 3262 final int splitEndKey = actualEndKey - keysPerRegion; 3263 final String keyFormat = "%08x"; 3264 final Table table = createTable(tableName, cfBytes, maxVersions, 3265 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3266 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3267 3268 if (hbaseCluster != null) { 3269 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3270 } 3271 3272 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3273 3274 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3275 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3276 final byte[] row = Bytes.toBytes( 3277 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3278 3279 Put put = new Put(row); 3280 Delete del = new Delete(row); 3281 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3282 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3283 final long ts = rand.nextInt(); 3284 final byte[] qual = Bytes.toBytes("col" + iCol); 3285 if (rand.nextBoolean()) { 3286 final byte[] value = 3287 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3288 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3289 put.addColumn(cf, qual, ts, value); 3290 } else if (rand.nextDouble() < 0.8) { 3291 del.addColumn(cf, qual, ts); 3292 } else { 3293 del.addColumns(cf, qual, ts); 3294 } 3295 } 3296 3297 if (!put.isEmpty()) { 3298 mutator.mutate(put); 3299 } 3300 3301 if (!del.isEmpty()) { 3302 mutator.mutate(del); 3303 } 3304 } 3305 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3306 mutator.flush(); 3307 if (hbaseCluster != null) { 3308 getMiniHBaseCluster().flushcache(table.getName()); 3309 } 3310 } 3311 mutator.close(); 3312 3313 return table; 3314 } 3315 3316 public static int randomFreePort() { 3317 return HBaseCommonTestingUtil.randomFreePort(); 3318 } 3319 3320 public static String randomMultiCastAddress() { 3321 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3322 } 3323 3324 public static void waitForHostPort(String host, int port) throws IOException { 3325 final int maxTimeMs = 10000; 3326 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3327 IOException savedException = null; 3328 LOG.info("Waiting for server at " + host + ":" + port); 3329 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3330 try { 3331 Socket sock = new Socket(InetAddress.getByName(host), port); 3332 sock.close(); 3333 savedException = null; 3334 LOG.info("Server at " + host + ":" + port + " is available"); 3335 break; 3336 } catch (UnknownHostException e) { 3337 throw new IOException("Failed to look up " + host, e); 3338 } catch (IOException e) { 3339 savedException = e; 3340 } 3341 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3342 } 3343 3344 if (savedException != null) { 3345 throw savedException; 3346 } 3347 } 3348 3349 /** 3350 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3351 * continues. 3352 * @return the number of regions the table was split into 3353 */ 3354 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3355 byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding) 3356 throws IOException { 3357 return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression, 3358 dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT); 3359 } 3360 3361 /** 3362 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3363 * continues. 3364 * @return the number of regions the table was split into 3365 */ 3366 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3367 byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding, 3368 int numRegionsPerServer, int regionReplication, Durability durability) throws IOException { 3369 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 3370 builder.setDurability(durability); 3371 builder.setRegionReplication(regionReplication); 3372 ColumnFamilyDescriptorBuilder cfBuilder = 3373 ColumnFamilyDescriptorBuilder.newBuilder(columnFamily); 3374 cfBuilder.setDataBlockEncoding(dataBlockEncoding); 3375 cfBuilder.setCompressionType(compression); 3376 return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(), 3377 numRegionsPerServer); 3378 } 3379 3380 /** 3381 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3382 * continues. 3383 * @return the number of regions the table was split into 3384 */ 3385 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3386 byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding, 3387 int numRegionsPerServer, int regionReplication, Durability durability) throws IOException { 3388 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 3389 builder.setDurability(durability); 3390 builder.setRegionReplication(regionReplication); 3391 ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length]; 3392 for (int i = 0; i < columnFamilies.length; i++) { 3393 ColumnFamilyDescriptorBuilder cfBuilder = 3394 ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]); 3395 cfBuilder.setDataBlockEncoding(dataBlockEncoding); 3396 cfBuilder.setCompressionType(compression); 3397 hcds[i] = cfBuilder.build(); 3398 } 3399 return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer); 3400 } 3401 3402 /** 3403 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3404 * continues. 3405 * @return the number of regions the table was split into 3406 */ 3407 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3408 ColumnFamilyDescriptor hcd) throws IOException { 3409 return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER); 3410 } 3411 3412 /** 3413 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3414 * continues. 3415 * @return the number of regions the table was split into 3416 */ 3417 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3418 ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException { 3419 return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd }, 3420 numRegionsPerServer); 3421 } 3422 3423 /** 3424 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3425 * continues. 3426 * @return the number of regions the table was split into 3427 */ 3428 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3429 ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException { 3430 return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(), 3431 numRegionsPerServer); 3432 } 3433 3434 /** 3435 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3436 * continues. 3437 * @return the number of regions the table was split into 3438 */ 3439 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td, 3440 ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer) 3441 throws IOException { 3442 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td); 3443 for (ColumnFamilyDescriptor cd : cds) { 3444 if (!td.hasColumnFamily(cd.getName())) { 3445 builder.setColumnFamily(cd); 3446 } 3447 } 3448 td = builder.build(); 3449 int totalNumberOfRegions = 0; 3450 Connection unmanagedConnection = ConnectionFactory.createConnection(conf); 3451 Admin admin = unmanagedConnection.getAdmin(); 3452 3453 try { 3454 // create a table a pre-splits regions. 3455 // The number of splits is set as: 3456 // region servers * regions per region server). 3457 int numberOfServers = admin.getRegionServers().size(); 3458 if (numberOfServers == 0) { 3459 throw new IllegalStateException("No live regionservers"); 3460 } 3461 3462 totalNumberOfRegions = numberOfServers * numRegionsPerServer; 3463 LOG.info("Number of live regionservers: " + numberOfServers + ", " 3464 + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: " 3465 + numRegionsPerServer + ")"); 3466 3467 byte[][] splits = splitter.split(totalNumberOfRegions); 3468 3469 admin.createTable(td, splits); 3470 } catch (MasterNotRunningException e) { 3471 LOG.error("Master not running", e); 3472 throw new IOException(e); 3473 } catch (TableExistsException e) { 3474 LOG.warn("Table " + td.getTableName() + " already exists, continuing"); 3475 } finally { 3476 admin.close(); 3477 unmanagedConnection.close(); 3478 } 3479 return totalNumberOfRegions; 3480 } 3481 3482 public static int getMetaRSPort(Connection connection) throws IOException { 3483 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3484 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3485 } 3486 } 3487 3488 /** 3489 * Due to async racing issue, a region may not be in the online region list of a region server 3490 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3491 */ 3492 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3493 final long timeout) throws IOException, InterruptedException { 3494 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3495 while (true) { 3496 List<RegionInfo> regions = getAdmin().getRegions(server); 3497 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3498 long now = EnvironmentEdgeManager.currentTime(); 3499 if (now > timeoutTime) break; 3500 Thread.sleep(10); 3501 } 3502 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3503 } 3504 3505 /** 3506 * Check to make sure the region is open on the specified region server, but not on any other one. 3507 */ 3508 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3509 final long timeout) throws IOException, InterruptedException { 3510 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3511 while (true) { 3512 List<RegionInfo> regions = getAdmin().getRegions(server); 3513 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3514 List<JVMClusterUtil.RegionServerThread> rsThreads = 3515 getHBaseCluster().getLiveRegionServerThreads(); 3516 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3517 HRegionServer rs = rsThread.getRegionServer(); 3518 if (server.equals(rs.getServerName())) { 3519 continue; 3520 } 3521 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3522 for (HRegion r : hrs) { 3523 assertTrue("Region should not be double assigned", 3524 r.getRegionInfo().getRegionId() != hri.getRegionId()); 3525 } 3526 } 3527 return; // good, we are happy 3528 } 3529 long now = EnvironmentEdgeManager.currentTime(); 3530 if (now > timeoutTime) break; 3531 Thread.sleep(10); 3532 } 3533 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3534 } 3535 3536 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3537 TableDescriptor td = 3538 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3539 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3540 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3541 } 3542 3543 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3544 BlockCache blockCache) throws IOException { 3545 TableDescriptor td = 3546 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3547 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3548 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3549 } 3550 3551 public static void setFileSystemURI(String fsURI) { 3552 FS_URI = fsURI; 3553 } 3554 3555 /** 3556 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3557 */ 3558 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3559 return new ExplainingPredicate<IOException>() { 3560 @Override 3561 public String explainFailure() throws IOException { 3562 final RegionStates regionStates = 3563 getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 3564 return "found in transition: " + regionStates.getRegionsInTransition().toString(); 3565 } 3566 3567 @Override 3568 public boolean evaluate() throws IOException { 3569 HMaster master = getMiniHBaseCluster().getMaster(); 3570 if (master == null) return false; 3571 AssignmentManager am = master.getAssignmentManager(); 3572 if (am == null) return false; 3573 return !am.hasRegionsInTransition(); 3574 } 3575 }; 3576 } 3577 3578 /** 3579 * Returns a {@link Predicate} for checking that table is enabled 3580 */ 3581 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3582 return new ExplainingPredicate<IOException>() { 3583 @Override 3584 public String explainFailure() throws IOException { 3585 return explainTableState(tableName, TableState.State.ENABLED); 3586 } 3587 3588 @Override 3589 public boolean evaluate() throws IOException { 3590 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3591 } 3592 }; 3593 } 3594 3595 /** 3596 * Returns a {@link Predicate} for checking that table is enabled 3597 */ 3598 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3599 return new ExplainingPredicate<IOException>() { 3600 @Override 3601 public String explainFailure() throws IOException { 3602 return explainTableState(tableName, TableState.State.DISABLED); 3603 } 3604 3605 @Override 3606 public boolean evaluate() throws IOException { 3607 return getAdmin().isTableDisabled(tableName); 3608 } 3609 }; 3610 } 3611 3612 /** 3613 * Returns a {@link Predicate} for checking that table is enabled 3614 */ 3615 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3616 return new ExplainingPredicate<IOException>() { 3617 @Override 3618 public String explainFailure() throws IOException { 3619 return explainTableAvailability(tableName); 3620 } 3621 3622 @Override 3623 public boolean evaluate() throws IOException { 3624 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3625 if (tableAvailable) { 3626 try (Table table = getConnection().getTable(tableName)) { 3627 TableDescriptor htd = table.getDescriptor(); 3628 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3629 .getAllRegionLocations()) { 3630 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3631 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3632 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3633 for (byte[] family : htd.getColumnFamilyNames()) { 3634 scan.addFamily(family); 3635 } 3636 try (ResultScanner scanner = table.getScanner(scan)) { 3637 scanner.next(); 3638 } 3639 } 3640 } 3641 } 3642 return tableAvailable; 3643 } 3644 }; 3645 } 3646 3647 /** 3648 * Wait until no regions in transition. 3649 * @param timeout How long to wait. 3650 */ 3651 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3652 waitFor(timeout, predicateNoRegionsInTransition()); 3653 } 3654 3655 /** 3656 * Wait until no regions in transition. (time limit 15min) 3657 */ 3658 public void waitUntilNoRegionsInTransition() throws IOException { 3659 waitUntilNoRegionsInTransition(15 * 60000); 3660 } 3661 3662 /** 3663 * Wait until labels is ready in VisibilityLabelsCache. 3664 */ 3665 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 3666 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 3667 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 3668 3669 @Override 3670 public boolean evaluate() { 3671 for (String label : labels) { 3672 if (labelsCache.getLabelOrdinal(label) == 0) { 3673 return false; 3674 } 3675 } 3676 return true; 3677 } 3678 3679 @Override 3680 public String explainFailure() { 3681 for (String label : labels) { 3682 if (labelsCache.getLabelOrdinal(label) == 0) { 3683 return label + " is not available yet"; 3684 } 3685 } 3686 return ""; 3687 } 3688 }); 3689 } 3690 3691 /** 3692 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3693 * available. 3694 * @return the list of column descriptors 3695 */ 3696 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 3697 return generateColumnDescriptors(""); 3698 } 3699 3700 /** 3701 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3702 * available. 3703 * @param prefix family names prefix 3704 * @return the list of column descriptors 3705 */ 3706 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 3707 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 3708 long familyId = 0; 3709 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 3710 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 3711 for (BloomType bloomType : BloomType.values()) { 3712 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 3713 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 3714 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 3715 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 3716 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 3717 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 3718 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 3719 familyId++; 3720 } 3721 } 3722 } 3723 return columnFamilyDescriptors; 3724 } 3725 3726 /** 3727 * Get supported compression algorithms. 3728 * @return supported compression algorithms. 3729 */ 3730 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 3731 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 3732 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 3733 for (String algoName : allAlgos) { 3734 try { 3735 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 3736 algo.getCompressor(); 3737 supportedAlgos.add(algo); 3738 } catch (Throwable t) { 3739 // this algo is not available 3740 } 3741 } 3742 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 3743 } 3744 3745 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 3746 Scan scan = new Scan().withStartRow(row); 3747 scan.setReadType(ReadType.PREAD); 3748 scan.setCaching(1); 3749 scan.setReversed(true); 3750 scan.addFamily(family); 3751 try (RegionScanner scanner = r.getScanner(scan)) { 3752 List<Cell> cells = new ArrayList<>(1); 3753 scanner.next(cells); 3754 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 3755 return null; 3756 } 3757 return Result.create(cells); 3758 } 3759 } 3760 3761 private boolean isTargetTable(final byte[] inRow, Cell c) { 3762 String inputRowString = Bytes.toString(inRow); 3763 int i = inputRowString.indexOf(HConstants.DELIMITER); 3764 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 3765 int o = outputRowString.indexOf(HConstants.DELIMITER); 3766 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 3767 } 3768 3769 /** 3770 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 3771 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 3772 * kerby KDC server and utility for using it, 3773 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 3774 * less baggage. It came in in HBASE-5291. 3775 */ 3776 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 3777 Properties conf = MiniKdc.createConf(); 3778 conf.put(MiniKdc.DEBUG, true); 3779 MiniKdc kdc = null; 3780 File dir = null; 3781 // There is time lag between selecting a port and trying to bind with it. It's possible that 3782 // another service captures the port in between which'll result in BindException. 3783 boolean bindException; 3784 int numTries = 0; 3785 do { 3786 try { 3787 bindException = false; 3788 dir = new File(getDataTestDir("kdc").toUri().getPath()); 3789 kdc = new MiniKdc(conf, dir); 3790 kdc.start(); 3791 } catch (BindException e) { 3792 FileUtils.deleteDirectory(dir); // clean directory 3793 numTries++; 3794 if (numTries == 3) { 3795 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 3796 throw e; 3797 } 3798 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 3799 bindException = true; 3800 } 3801 } while (bindException); 3802 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 3803 return kdc; 3804 } 3805 3806 public int getNumHFiles(final TableName tableName, final byte[] family) { 3807 int numHFiles = 0; 3808 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 3809 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 3810 } 3811 return numHFiles; 3812 } 3813 3814 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 3815 final byte[] family) { 3816 int numHFiles = 0; 3817 for (Region region : rs.getRegions(tableName)) { 3818 numHFiles += region.getStore(family).getStorefilesCount(); 3819 } 3820 return numHFiles; 3821 } 3822 3823 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 3824 assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode()); 3825 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 3826 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 3827 assertEquals(ltdFamilies.size(), rtdFamilies.size()); 3828 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 3829 it2 = rtdFamilies.iterator(); it.hasNext();) { 3830 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 3831 } 3832 } 3833 3834 /** 3835 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 3836 * invocations. 3837 */ 3838 public static void await(final long sleepMillis, final BooleanSupplier condition) 3839 throws InterruptedException { 3840 try { 3841 while (!condition.getAsBoolean()) { 3842 Thread.sleep(sleepMillis); 3843 } 3844 } catch (RuntimeException e) { 3845 if (e.getCause() instanceof AssertionError) { 3846 throw (AssertionError) e.getCause(); 3847 } 3848 throw e; 3849 } 3850 } 3851}