001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.File; 021import java.io.IOException; 022import java.io.OutputStream; 023import java.io.UncheckedIOException; 024import java.lang.reflect.Field; 025import java.lang.reflect.Modifier; 026import java.net.BindException; 027import java.net.DatagramSocket; 028import java.net.InetAddress; 029import java.net.ServerSocket; 030import java.net.Socket; 031import java.net.UnknownHostException; 032import java.nio.charset.StandardCharsets; 033import java.security.MessageDigest; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.HashSet; 039import java.util.Iterator; 040import java.util.List; 041import java.util.Map; 042import java.util.NavigableSet; 043import java.util.Properties; 044import java.util.Random; 045import java.util.Set; 046import java.util.TreeSet; 047import java.util.concurrent.ThreadLocalRandom; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicReference; 050import java.util.function.BooleanSupplier; 051import org.apache.commons.io.FileUtils; 052import org.apache.commons.lang3.RandomStringUtils; 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.fs.FileSystem; 055import org.apache.hadoop.fs.Path; 056import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 057import org.apache.hadoop.hbase.Waiter.Predicate; 058import org.apache.hadoop.hbase.client.Admin; 059import org.apache.hadoop.hbase.client.AsyncClusterConnection; 060import org.apache.hadoop.hbase.client.BufferedMutator; 061import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 064import org.apache.hadoop.hbase.client.Connection; 065import org.apache.hadoop.hbase.client.ConnectionFactory; 066import org.apache.hadoop.hbase.client.Consistency; 067import org.apache.hadoop.hbase.client.Delete; 068import org.apache.hadoop.hbase.client.Durability; 069import org.apache.hadoop.hbase.client.Get; 070import org.apache.hadoop.hbase.client.Hbck; 071import org.apache.hadoop.hbase.client.MasterRegistry; 072import org.apache.hadoop.hbase.client.Put; 073import org.apache.hadoop.hbase.client.RegionInfo; 074import org.apache.hadoop.hbase.client.RegionInfoBuilder; 075import org.apache.hadoop.hbase.client.RegionLocator; 076import org.apache.hadoop.hbase.client.Result; 077import org.apache.hadoop.hbase.client.ResultScanner; 078import org.apache.hadoop.hbase.client.Scan; 079import org.apache.hadoop.hbase.client.Scan.ReadType; 080import org.apache.hadoop.hbase.client.Table; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 083import org.apache.hadoop.hbase.client.TableState; 084import org.apache.hadoop.hbase.fs.HFileSystem; 085import org.apache.hadoop.hbase.io.compress.Compression; 086import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 088import org.apache.hadoop.hbase.io.hfile.BlockCache; 089import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 090import org.apache.hadoop.hbase.io.hfile.HFile; 091import org.apache.hadoop.hbase.ipc.RpcServerInterface; 092import org.apache.hadoop.hbase.keymeta.KeymetaAdminClient; 093import org.apache.hadoop.hbase.logging.Log4jUtils; 094import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 095import org.apache.hadoop.hbase.master.HMaster; 096import org.apache.hadoop.hbase.master.RegionState; 097import org.apache.hadoop.hbase.master.ServerManager; 098import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 099import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 100import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 101import org.apache.hadoop.hbase.master.assignment.RegionStates; 102import org.apache.hadoop.hbase.mob.MobFileCache; 103import org.apache.hadoop.hbase.regionserver.BloomType; 104import org.apache.hadoop.hbase.regionserver.ChunkCreator; 105import org.apache.hadoop.hbase.regionserver.HRegion; 106import org.apache.hadoop.hbase.regionserver.HRegionServer; 107import org.apache.hadoop.hbase.regionserver.HStore; 108import org.apache.hadoop.hbase.regionserver.InternalScanner; 109import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 110import org.apache.hadoop.hbase.regionserver.Region; 111import org.apache.hadoop.hbase.regionserver.RegionScanner; 112import org.apache.hadoop.hbase.regionserver.RegionServerServices; 113import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 114import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 115import org.apache.hadoop.hbase.security.User; 116import org.apache.hadoop.hbase.security.UserProvider; 117import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 118import org.apache.hadoop.hbase.util.Bytes; 119import org.apache.hadoop.hbase.util.CommonFSUtils; 120import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 121import org.apache.hadoop.hbase.util.FSUtils; 122import org.apache.hadoop.hbase.util.JVMClusterUtil; 123import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 124import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 125import org.apache.hadoop.hbase.util.Pair; 126import org.apache.hadoop.hbase.util.ReflectionUtils; 127import org.apache.hadoop.hbase.util.RegionSplitter; 128import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm; 129import org.apache.hadoop.hbase.util.RetryCounter; 130import org.apache.hadoop.hbase.util.Threads; 131import org.apache.hadoop.hbase.wal.WAL; 132import org.apache.hadoop.hbase.wal.WALFactory; 133import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 134import org.apache.hadoop.hbase.zookeeper.ZKConfig; 135import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 136import org.apache.hadoop.hdfs.DFSClient; 137import org.apache.hadoop.hdfs.DistributedFileSystem; 138import org.apache.hadoop.hdfs.MiniDFSCluster; 139import org.apache.hadoop.hdfs.server.datanode.DataNode; 140import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 141import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 142import org.apache.hadoop.mapred.JobConf; 143import org.apache.hadoop.mapred.MiniMRCluster; 144import org.apache.hadoop.mapred.TaskLog; 145import org.apache.hadoop.minikdc.MiniKdc; 146import org.apache.yetus.audience.InterfaceAudience; 147import org.apache.zookeeper.WatchedEvent; 148import org.apache.zookeeper.ZooKeeper; 149import org.apache.zookeeper.ZooKeeper.States; 150 151import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 152 153import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 154 155/** 156 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 157 * functionality. Create an instance and keep it around testing HBase. This class is meant to be 158 * your one-stop shop for anything you might need testing. Manages one cluster at a time only. 159 * Managed cluster can be an in-process {@link MiniHBaseCluster}, or a deployed cluster of type 160 * {@code DistributedHBaseCluster}. Not all methods work with the real cluster. Depends on log4j 161 * being on classpath and hbase-site.xml for logging and test-run configuration. It does not set 162 * logging levels. In the configuration properties, default values for master-info-port and 163 * region-server-port are overridden such that a random port will be assigned (thus avoiding port 164 * contention if another local HBase instance is already running). 165 * <p> 166 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 167 * setting it to true. 168 * @deprecated since 3.0.0, will be removed in 4.0.0. Use 169 * {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead. 170 */ 171@InterfaceAudience.Public 172@Deprecated 173public class HBaseTestingUtility extends HBaseZKTestingUtility { 174 175 public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server"; 176 /** 177 * The default number of regions per regionserver when creating a pre-split table. 178 */ 179 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 180 181 private MiniDFSCluster dfsCluster = null; 182 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 183 184 private volatile HBaseCluster hbaseCluster = null; 185 private MiniMRCluster mrCluster = null; 186 187 /** If there is a mini cluster running for this testing utility instance. */ 188 private volatile boolean miniClusterRunning; 189 190 private String hadoopLogDir; 191 192 /** 193 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 194 */ 195 private Path dataTestDirOnTestFS = null; 196 197 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 198 199 /** Filesystem URI used for map-reduce mini-cluster setup */ 200 private static String FS_URI; 201 202 /** This is for unit tests parameterized with a single boolean. */ 203 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 204 205 private Admin hbaseAdmin = null; 206 207 /** 208 * Checks to see if a specific port is available. 209 * @param port the port number to check for availability 210 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 211 */ 212 public static boolean available(int port) { 213 ServerSocket ss = null; 214 DatagramSocket ds = null; 215 try { 216 ss = new ServerSocket(port); 217 ss.setReuseAddress(true); 218 ds = new DatagramSocket(port); 219 ds.setReuseAddress(true); 220 return true; 221 } catch (IOException e) { 222 // Do nothing 223 } finally { 224 if (ds != null) { 225 ds.close(); 226 } 227 228 if (ss != null) { 229 try { 230 ss.close(); 231 } catch (IOException e) { 232 /* should not be thrown */ 233 } 234 } 235 } 236 237 return false; 238 } 239 240 /** 241 * Create all combinations of Bloom filters and compression algorithms for testing. 242 */ 243 private static List<Object[]> bloomAndCompressionCombinations() { 244 List<Object[]> configurations = new ArrayList<>(); 245 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) { 246 for (BloomType bloomType : BloomType.values()) { 247 configurations.add(new Object[] { comprAlgo, bloomType }); 248 } 249 } 250 return Collections.unmodifiableList(configurations); 251 } 252 253 /** 254 * Create combination of memstoreTS and tags 255 */ 256 private static List<Object[]> memStoreTSAndTagsCombination() { 257 List<Object[]> configurations = new ArrayList<>(); 258 configurations.add(new Object[] { false, false }); 259 configurations.add(new Object[] { false, true }); 260 configurations.add(new Object[] { true, false }); 261 configurations.add(new Object[] { true, true }); 262 return Collections.unmodifiableList(configurations); 263 } 264 265 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 266 List<Object[]> configurations = new ArrayList<>(); 267 configurations.add(new Object[] { false, false, true }); 268 configurations.add(new Object[] { false, false, false }); 269 configurations.add(new Object[] { false, true, true }); 270 configurations.add(new Object[] { false, true, false }); 271 configurations.add(new Object[] { true, false, true }); 272 configurations.add(new Object[] { true, false, false }); 273 configurations.add(new Object[] { true, true, true }); 274 configurations.add(new Object[] { true, true, false }); 275 return Collections.unmodifiableList(configurations); 276 } 277 278 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 279 bloomAndCompressionCombinations(); 280 281 /** 282 * <p> 283 * Create an HBaseTestingUtility using a default configuration. 284 * <p> 285 * Initially, all tmp files are written to a local test data directory. Once 286 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 287 * data will be written to the DFS directory instead. 288 */ 289 public HBaseTestingUtility() { 290 this(HBaseConfiguration.create()); 291 } 292 293 /** 294 * <p> 295 * Create an HBaseTestingUtility using a given configuration. 296 * <p> 297 * Initially, all tmp files are written to a local test data directory. Once 298 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 299 * data will be written to the DFS directory instead. 300 * @param conf The configuration to use for further operations 301 */ 302 public HBaseTestingUtility(Configuration conf) { 303 super(conf); 304 305 // a hbase checksum verification failure will cause unit tests to fail 306 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 307 308 // Save this for when setting default file:// breaks things 309 if (this.conf.get("fs.defaultFS") != null) { 310 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 311 } 312 if (this.conf.get(HConstants.HBASE_DIR) != null) { 313 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 314 } 315 // Every cluster is a local cluster until we start DFS 316 // Note that conf could be null, but this.conf will not be 317 String dataTestDir = getDataTestDir().toString(); 318 this.conf.set("fs.defaultFS", "file:///"); 319 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 320 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 321 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 322 // If the value for random ports isn't set set it to true, thus making 323 // tests opt-out for random port assignment 324 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 325 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 326 } 327 328 /** 329 * Close both the region {@code r} and it's underlying WAL. For use in tests. 330 */ 331 public static void closeRegionAndWAL(final Region r) throws IOException { 332 closeRegionAndWAL((HRegion) r); 333 } 334 335 /** 336 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 337 */ 338 public static void closeRegionAndWAL(final HRegion r) throws IOException { 339 if (r == null) return; 340 r.close(); 341 if (r.getWAL() == null) return; 342 r.getWAL().close(); 343 } 344 345 /** 346 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 347 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 348 * by the Configuration. If say, a Connection was being used against a cluster that had been 349 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 350 * Rather than use the return direct, its usually best to make a copy and use that. Do 351 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 352 * @return Instance of Configuration. 353 */ 354 @Override 355 public Configuration getConfiguration() { 356 return super.getConfiguration(); 357 } 358 359 public void setHBaseCluster(HBaseCluster hbaseCluster) { 360 this.hbaseCluster = hbaseCluster; 361 } 362 363 /** 364 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 365 * have many concurrent tests running if we need to. Moding a System property is not the way to do 366 * concurrent instances -- another instance could grab the temporary value unintentionally -- but 367 * not anything can do about it at moment; single instance only is how the minidfscluster works. 368 * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir 369 * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir 370 * (We do not create them!). 371 * @return The calculated data test build directory, if newly-created. 372 */ 373 @Override 374 protected Path setupDataTestDir() { 375 Path testPath = super.setupDataTestDir(); 376 if (null == testPath) { 377 return null; 378 } 379 380 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 381 382 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 383 // we want our own value to ensure uniqueness on the same machine 384 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 385 386 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 387 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 388 return testPath; 389 } 390 391 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 392 393 String sysValue = System.getProperty(propertyName); 394 395 if (sysValue != null) { 396 // There is already a value set. So we do nothing but hope 397 // that there will be no conflicts 398 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 399 + " so I do NOT create it in " + parent); 400 String confValue = conf.get(propertyName); 401 if (confValue != null && !confValue.endsWith(sysValue)) { 402 LOG.warn(propertyName + " property value differs in configuration and system: " 403 + "Configuration=" + confValue + " while System=" + sysValue 404 + " Erasing configuration value by system value."); 405 } 406 conf.set(propertyName, sysValue); 407 } else { 408 // Ok, it's not set, so we create it as a subdirectory 409 createSubDir(propertyName, parent, subDirName); 410 System.setProperty(propertyName, conf.get(propertyName)); 411 } 412 } 413 414 /** 415 * @return Where to write test data on the test filesystem; Returns working directory for the test 416 * filesystem by default 417 * @see #setupDataTestDirOnTestFS() 418 * @see #getTestFileSystem() 419 */ 420 private Path getBaseTestDirOnTestFS() throws IOException { 421 FileSystem fs = getTestFileSystem(); 422 return new Path(fs.getWorkingDirectory(), "test-data"); 423 } 424 425 /** 426 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 427 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 428 * on it. 429 * @return a unique path in the test filesystem 430 */ 431 public Path getDataTestDirOnTestFS() throws IOException { 432 if (dataTestDirOnTestFS == null) { 433 setupDataTestDirOnTestFS(); 434 } 435 436 return dataTestDirOnTestFS; 437 } 438 439 /** 440 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 441 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 442 * on it. 443 * @return a unique path in the test filesystem 444 * @param subdirName name of the subdir to create under the base test dir 445 */ 446 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 447 return new Path(getDataTestDirOnTestFS(), subdirName); 448 } 449 450 /** 451 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 452 * setup. 453 */ 454 private void setupDataTestDirOnTestFS() throws IOException { 455 if (dataTestDirOnTestFS != null) { 456 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 457 return; 458 } 459 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 460 } 461 462 /** 463 * Sets up a new path in test filesystem to be used by tests. 464 */ 465 private Path getNewDataTestDirOnTestFS() throws IOException { 466 // The file system can be either local, mini dfs, or if the configuration 467 // is supplied externally, it can be an external cluster FS. If it is a local 468 // file system, the tests should use getBaseTestDir, otherwise, we can use 469 // the working directory, and create a unique sub dir there 470 FileSystem fs = getTestFileSystem(); 471 Path newDataTestDir; 472 String randomStr = getRandomUUID().toString(); 473 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 474 newDataTestDir = new Path(getDataTestDir(), randomStr); 475 File dataTestDir = new File(newDataTestDir.toString()); 476 if (deleteOnExit()) dataTestDir.deleteOnExit(); 477 } else { 478 Path base = getBaseTestDirOnTestFS(); 479 newDataTestDir = new Path(base, randomStr); 480 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 481 } 482 return newDataTestDir; 483 } 484 485 /** 486 * Cleans the test data directory on the test filesystem. 487 * @return True if we removed the test dirs 488 */ 489 public boolean cleanupDataTestDirOnTestFS() throws IOException { 490 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 491 if (ret) dataTestDirOnTestFS = null; 492 return ret; 493 } 494 495 /** 496 * Cleans a subdirectory under the test data directory on the test filesystem. 497 * @return True if we removed child 498 */ 499 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 500 Path cpath = getDataTestDirOnTestFS(subdirName); 501 return getTestFileSystem().delete(cpath, true); 502 } 503 504 // Workaround to avoid IllegalThreadStateException 505 // See HBASE-27148 for more details 506 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 507 508 private volatile boolean stopped = false; 509 510 private final MiniDFSCluster cluster; 511 512 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 513 super("FsDatasetAsyncDiskServiceFixer"); 514 setDaemon(true); 515 this.cluster = cluster; 516 } 517 518 @Override 519 public void run() { 520 while (!stopped) { 521 try { 522 Thread.sleep(30000); 523 } catch (InterruptedException e) { 524 Thread.currentThread().interrupt(); 525 continue; 526 } 527 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 528 // timeout of the thread pool executor is 60 seconds by default. 529 try { 530 for (DataNode dn : cluster.getDataNodes()) { 531 FsDatasetSpi<?> dataset = dn.getFSDataset(); 532 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 533 service.setAccessible(true); 534 Object asyncDiskService = service.get(dataset); 535 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 536 group.setAccessible(true); 537 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 538 if (threadGroup.isDaemon()) { 539 threadGroup.setDaemon(false); 540 } 541 } 542 } catch (NoSuchFieldException e) { 543 LOG.debug("NoSuchFieldException: " + e.getMessage() 544 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 545 + "See HBASE-27595 for details."); 546 } catch (Exception e) { 547 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 548 } 549 } 550 } 551 552 void shutdown() { 553 stopped = true; 554 interrupt(); 555 } 556 } 557 558 /** 559 * Start a minidfscluster. 560 * @param servers How many DNs to start. 561 * @see #shutdownMiniDFSCluster() 562 * @return The mini dfs cluster created. 563 */ 564 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 565 return startMiniDFSCluster(servers, null); 566 } 567 568 /** 569 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 570 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 571 * instances of the datanodes will have the same host name. 572 * @param hosts hostnames DNs to run on. 573 * @see #shutdownMiniDFSCluster() 574 * @return The mini dfs cluster created. 575 */ 576 public MiniDFSCluster startMiniDFSCluster(final String hosts[]) throws Exception { 577 if (hosts != null && hosts.length != 0) { 578 return startMiniDFSCluster(hosts.length, hosts); 579 } else { 580 return startMiniDFSCluster(1, null); 581 } 582 } 583 584 /** 585 * Start a minidfscluster. Can only create one. 586 * @param servers How many DNs to start. 587 * @param hosts hostnames DNs to run on. 588 * @see #shutdownMiniDFSCluster() 589 * @return The mini dfs cluster created. 590 */ 591 public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[]) throws Exception { 592 return startMiniDFSCluster(servers, null, hosts); 593 } 594 595 private void setFs() throws IOException { 596 if (this.dfsCluster == null) { 597 LOG.info("Skipping setting fs because dfsCluster is null"); 598 return; 599 } 600 FileSystem fs = this.dfsCluster.getFileSystem(); 601 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 602 603 // re-enable this check with dfs 604 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 605 } 606 607 public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[]) 608 throws Exception { 609 createDirsAndSetProperties(); 610 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 611 612 this.dfsCluster = 613 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 614 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 615 this.dfsClusterFixer.start(); 616 // Set this just-started cluster as our filesystem. 617 setFs(); 618 619 // Wait for the cluster to be totally up 620 this.dfsCluster.waitClusterUp(); 621 622 // reset the test directory for test file system 623 dataTestDirOnTestFS = null; 624 String dataTestDir = getDataTestDir().toString(); 625 conf.set(HConstants.HBASE_DIR, dataTestDir); 626 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 627 628 return this.dfsCluster; 629 } 630 631 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 632 createDirsAndSetProperties(); 633 dfsCluster = 634 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 635 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 636 this.dfsClusterFixer.start(); 637 return dfsCluster; 638 } 639 640 /** 641 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 642 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 643 * the conf. 644 * 645 * <pre> 646 * Configuration conf = TEST_UTIL.getConfiguration(); 647 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 648 * Map.Entry<String, String> e = i.next(); 649 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 650 * } 651 * </pre> 652 */ 653 private void createDirsAndSetProperties() throws IOException { 654 setupClusterTestDir(); 655 conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath()); 656 createDirAndSetProperty("test.cache.data"); 657 createDirAndSetProperty("hadoop.tmp.dir"); 658 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 659 createDirAndSetProperty("mapreduce.cluster.local.dir"); 660 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 661 enableShortCircuit(); 662 663 Path root = getDataTestDirOnTestFS("hadoop"); 664 conf.set(MapreduceTestingShim.getMROutputDirProp(), 665 new Path(root, "mapred-output-dir").toString()); 666 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 667 conf.set("mapreduce.jobtracker.staging.root.dir", 668 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 669 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 670 conf.set("yarn.app.mapreduce.am.staging-dir", 671 new Path(root, "mapreduce-am-staging-root-dir").toString()); 672 673 // Frustrate yarn's and hdfs's attempts at writing /tmp. 674 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 675 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 676 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 677 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 678 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 679 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 680 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 681 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 682 createDirAndSetProperty("dfs.journalnode.edits.dir"); 683 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 684 createDirAndSetProperty("nfs.dump.dir"); 685 createDirAndSetProperty("java.io.tmpdir"); 686 createDirAndSetProperty("dfs.journalnode.edits.dir"); 687 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 688 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 689 } 690 691 /** 692 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 693 * Default to false. 694 */ 695 public boolean isNewVersionBehaviorEnabled() { 696 final String propName = "hbase.tests.new.version.behavior"; 697 String v = System.getProperty(propName); 698 if (v != null) { 699 return Boolean.parseBoolean(v); 700 } 701 return false; 702 } 703 704 /** 705 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 706 * allows to specify this parameter on the command line. If not set, default is true. 707 */ 708 public boolean isReadShortCircuitOn() { 709 final String propName = "hbase.tests.use.shortcircuit.reads"; 710 String readOnProp = System.getProperty(propName); 711 if (readOnProp != null) { 712 return Boolean.parseBoolean(readOnProp); 713 } else { 714 return conf.getBoolean(propName, false); 715 } 716 } 717 718 /** 719 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 720 * including skipping the hdfs checksum checks. 721 */ 722 private void enableShortCircuit() { 723 if (isReadShortCircuitOn()) { 724 String curUser = System.getProperty("user.name"); 725 LOG.info("read short circuit is ON for user " + curUser); 726 // read short circuit, for hdfs 727 conf.set("dfs.block.local-path-access.user", curUser); 728 // read short circuit, for hbase 729 conf.setBoolean("dfs.client.read.shortcircuit", true); 730 // Skip checking checksum, for the hdfs client and the datanode 731 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 732 } else { 733 LOG.info("read short circuit is OFF"); 734 } 735 } 736 737 private String createDirAndSetProperty(final String property) { 738 return createDirAndSetProperty(property, property); 739 } 740 741 private String createDirAndSetProperty(final String relPath, String property) { 742 String path = getDataTestDir(relPath).toString(); 743 System.setProperty(property, path); 744 conf.set(property, path); 745 new File(path).mkdirs(); 746 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 747 return path; 748 } 749 750 /** 751 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 752 */ 753 public void shutdownMiniDFSCluster() throws IOException { 754 if (this.dfsCluster != null) { 755 // The below throws an exception per dn, AsynchronousCloseException. 756 this.dfsCluster.shutdown(); 757 dfsCluster = null; 758 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 759 // have a fixer 760 if (dfsClusterFixer != null) { 761 this.dfsClusterFixer.shutdown(); 762 dfsClusterFixer = null; 763 } 764 dataTestDirOnTestFS = null; 765 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 766 } 767 } 768 769 /** 770 * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately. 771 * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}. 772 * @param createWALDir Whether to create a new WAL directory. 773 * @return The mini HBase cluster created. 774 * @see #shutdownMiniCluster() 775 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 776 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 777 * @see #startMiniCluster(StartMiniClusterOption) 778 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 779 */ 780 @Deprecated 781 public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception { 782 StartMiniClusterOption option = 783 StartMiniClusterOption.builder().createWALDir(createWALDir).build(); 784 return startMiniCluster(option); 785 } 786 787 /** 788 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 789 * defined in {@link StartMiniClusterOption.Builder}. 790 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 791 * @param createRootDir Whether to create a new root or data directory path. 792 * @return The mini HBase cluster created. 793 * @see #shutdownMiniCluster() 794 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 795 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 796 * @see #startMiniCluster(StartMiniClusterOption) 797 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 798 */ 799 @Deprecated 800 public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir) throws Exception { 801 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves) 802 .numDataNodes(numSlaves).createRootDir(createRootDir).build(); 803 return startMiniCluster(option); 804 } 805 806 /** 807 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 808 * defined in {@link StartMiniClusterOption.Builder}. 809 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 810 * @param createRootDir Whether to create a new root or data directory path. 811 * @param createWALDir Whether to create a new WAL directory. 812 * @return The mini HBase cluster created. 813 * @see #shutdownMiniCluster() 814 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 815 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 816 * @see #startMiniCluster(StartMiniClusterOption) 817 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 818 */ 819 @Deprecated 820 public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir, 821 boolean createWALDir) throws Exception { 822 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves) 823 .numDataNodes(numSlaves).createRootDir(createRootDir).createWALDir(createWALDir).build(); 824 return startMiniCluster(option); 825 } 826 827 /** 828 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 829 * defined in {@link StartMiniClusterOption.Builder}. 830 * @param numMasters Master node number. 831 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 832 * @param createRootDir Whether to create a new root or data directory path. 833 * @return The mini HBase cluster created. 834 * @see #shutdownMiniCluster() 835 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 836 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 837 * @see #startMiniCluster(StartMiniClusterOption) 838 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 839 */ 840 @Deprecated 841 public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir) 842 throws Exception { 843 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 844 .numRegionServers(numSlaves).createRootDir(createRootDir).numDataNodes(numSlaves).build(); 845 return startMiniCluster(option); 846 } 847 848 /** 849 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 850 * defined in {@link StartMiniClusterOption.Builder}. 851 * @param numMasters Master node number. 852 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 853 * @return The mini HBase cluster created. 854 * @see #shutdownMiniCluster() 855 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 856 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 857 * @see #startMiniCluster(StartMiniClusterOption) 858 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 859 */ 860 @Deprecated 861 public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception { 862 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 863 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 864 return startMiniCluster(option); 865 } 866 867 /** 868 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 869 * defined in {@link StartMiniClusterOption.Builder}. 870 * @param numMasters Master node number. 871 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 872 * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite 873 * HDFS data node number. 874 * @param createRootDir Whether to create a new root or data directory path. 875 * @return The mini HBase cluster created. 876 * @see #shutdownMiniCluster() 877 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 878 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 879 * @see #startMiniCluster(StartMiniClusterOption) 880 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 881 */ 882 @Deprecated 883 public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts, 884 boolean createRootDir) throws Exception { 885 StartMiniClusterOption option = 886 StartMiniClusterOption.builder().numMasters(numMasters).numRegionServers(numSlaves) 887 .createRootDir(createRootDir).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build(); 888 return startMiniCluster(option); 889 } 890 891 /** 892 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 893 * defined in {@link StartMiniClusterOption.Builder}. 894 * @param numMasters Master node number. 895 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 896 * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite 897 * HDFS data node number. 898 * @return The mini HBase cluster created. 899 * @see #shutdownMiniCluster() 900 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 901 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 902 * @see #startMiniCluster(StartMiniClusterOption) 903 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 904 */ 905 @Deprecated 906 public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts) 907 throws Exception { 908 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 909 .numRegionServers(numSlaves).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build(); 910 return startMiniCluster(option); 911 } 912 913 /** 914 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 915 * defined in {@link StartMiniClusterOption.Builder}. 916 * @param numMasters Master node number. 917 * @param numRegionServers Number of region servers. 918 * @param numDataNodes Number of datanodes. 919 * @return The mini HBase cluster created. 920 * @see #shutdownMiniCluster() 921 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 922 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 923 * @see #startMiniCluster(StartMiniClusterOption) 924 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 925 */ 926 @Deprecated 927 public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes) 928 throws Exception { 929 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 930 .numRegionServers(numRegionServers).numDataNodes(numDataNodes).build(); 931 return startMiniCluster(option); 932 } 933 934 /** 935 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 936 * defined in {@link StartMiniClusterOption.Builder}. 937 * @param numMasters Master node number. 938 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 939 * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite 940 * HDFS data node number. 941 * @param masterClass The class to use as HMaster, or null for default. 942 * @param rsClass The class to use as HRegionServer, or null for default. 943 * @return The mini HBase cluster created. 944 * @see #shutdownMiniCluster() 945 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 946 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 947 * @see #startMiniCluster(StartMiniClusterOption) 948 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 949 */ 950 @Deprecated 951 public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts, 952 Class<? extends HMaster> masterClass, 953 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception { 954 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 955 .masterClass(masterClass).numRegionServers(numSlaves).rsClass(rsClass).numDataNodes(numSlaves) 956 .dataNodeHosts(dataNodeHosts).build(); 957 return startMiniCluster(option); 958 } 959 960 /** 961 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 962 * defined in {@link StartMiniClusterOption.Builder}. 963 * @param numMasters Master node number. 964 * @param numRegionServers Number of region servers. 965 * @param numDataNodes Number of datanodes. 966 * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will 967 * overwrite HDFS data node number. 968 * @param masterClass The class to use as HMaster, or null for default. 969 * @param rsClass The class to use as HRegionServer, or null for default. 970 * @return The mini HBase cluster created. 971 * @see #shutdownMiniCluster() 972 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 973 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 974 * @see #startMiniCluster(StartMiniClusterOption) 975 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 976 */ 977 @Deprecated 978 public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes, 979 String[] dataNodeHosts, Class<? extends HMaster> masterClass, 980 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception { 981 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 982 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass) 983 .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).build(); 984 return startMiniCluster(option); 985 } 986 987 /** 988 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 989 * defined in {@link StartMiniClusterOption.Builder}. 990 * @param numMasters Master node number. 991 * @param numRegionServers Number of region servers. 992 * @param numDataNodes Number of datanodes. 993 * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will 994 * overwrite HDFS data node number. 995 * @param masterClass The class to use as HMaster, or null for default. 996 * @param rsClass The class to use as HRegionServer, or null for default. 997 * @param createRootDir Whether to create a new root or data directory path. 998 * @param createWALDir Whether to create a new WAL directory. 999 * @return The mini HBase cluster created. 1000 * @see #shutdownMiniCluster() 1001 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 1002 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 1003 * @see #startMiniCluster(StartMiniClusterOption) 1004 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 1005 */ 1006 @Deprecated 1007 public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes, 1008 String[] dataNodeHosts, Class<? extends HMaster> masterClass, 1009 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir, 1010 boolean createWALDir) throws Exception { 1011 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 1012 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass) 1013 .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).createRootDir(createRootDir) 1014 .createWALDir(createWALDir).build(); 1015 return startMiniCluster(option); 1016 } 1017 1018 /** 1019 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 1020 * other options will use default values, defined in {@link StartMiniClusterOption.Builder}. 1021 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 1022 * @see #startMiniCluster(StartMiniClusterOption option) 1023 * @see #shutdownMiniDFSCluster() 1024 */ 1025 public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception { 1026 StartMiniClusterOption option = 1027 StartMiniClusterOption.builder().numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 1028 return startMiniCluster(option); 1029 } 1030 1031 /** 1032 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 1033 * value can be found in {@link StartMiniClusterOption.Builder}. 1034 * @see #startMiniCluster(StartMiniClusterOption option) 1035 * @see #shutdownMiniDFSCluster() 1036 */ 1037 public MiniHBaseCluster startMiniCluster() throws Exception { 1038 return startMiniCluster(StartMiniClusterOption.builder().build()); 1039 } 1040 1041 /** 1042 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 1043 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 1044 * under System property test.build.data, to be cleaned up on exit. 1045 * @see #shutdownMiniDFSCluster() 1046 */ 1047 public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception { 1048 LOG.info("Starting up minicluster with option: {}", option); 1049 1050 // If we already put up a cluster, fail. 1051 if (miniClusterRunning) { 1052 throw new IllegalStateException("A mini-cluster is already running"); 1053 } 1054 miniClusterRunning = true; 1055 1056 setupClusterTestDir(); 1057 1058 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 1059 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 1060 if (dfsCluster == null) { 1061 LOG.info("STARTING DFS"); 1062 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 1063 } else { 1064 LOG.info("NOT STARTING DFS"); 1065 } 1066 1067 // Start up a zk cluster. 1068 if (getZkCluster() == null) { 1069 startMiniZKCluster(option.getNumZkServers()); 1070 } 1071 1072 // Start the MiniHBaseCluster 1073 return startMiniHBaseCluster(option); 1074 } 1075 1076 /** 1077 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 1078 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 1079 * @return Reference to the hbase mini hbase cluster. 1080 * @see #startMiniCluster(StartMiniClusterOption) 1081 * @see #shutdownMiniHBaseCluster() 1082 */ 1083 public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option) 1084 throws IOException, InterruptedException { 1085 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 1086 createRootDir(option.isCreateRootDir()); 1087 if (option.isCreateWALDir()) { 1088 createWALRootDir(); 1089 } 1090 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 1091 // for tests that do not read hbase-defaults.xml 1092 setHBaseFsTmpDir(); 1093 1094 // These settings will make the server waits until this exact number of 1095 // regions servers are connected. 1096 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 1097 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 1098 } 1099 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 1100 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 1101 } 1102 1103 Configuration c = new Configuration(this.conf); 1104 this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(), 1105 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1106 option.getMasterClass(), option.getRsClass()); 1107 // Populate the master address configuration from mini cluster configuration. 1108 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 1109 // Don't leave here till we've done a successful scan of the hbase:meta 1110 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 1111 ResultScanner s = t.getScanner(new Scan())) { 1112 for (;;) { 1113 if (s.next() == null) { 1114 break; 1115 } 1116 } 1117 } 1118 1119 getAdmin(); // create immediately the hbaseAdmin 1120 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 1121 1122 return (MiniHBaseCluster) hbaseCluster; 1123 } 1124 1125 /** 1126 * Starts up mini hbase cluster using default options. Default options can be found in 1127 * {@link StartMiniClusterOption.Builder}. 1128 * @see #startMiniHBaseCluster(StartMiniClusterOption) 1129 * @see #shutdownMiniHBaseCluster() 1130 */ 1131 public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException { 1132 return startMiniHBaseCluster(StartMiniClusterOption.builder().build()); 1133 } 1134 1135 /** 1136 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 1137 * {@link #startMiniCluster()}. All other options will use default values, defined in 1138 * {@link StartMiniClusterOption.Builder}. 1139 * @param numMasters Master node number. 1140 * @param numRegionServers Number of region servers. 1141 * @return The mini HBase cluster created. 1142 * @see #shutdownMiniHBaseCluster() 1143 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 1144 * {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead. 1145 * @see #startMiniHBaseCluster(StartMiniClusterOption) 1146 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 1147 */ 1148 @Deprecated 1149 public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 1150 throws IOException, InterruptedException { 1151 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 1152 .numRegionServers(numRegionServers).build(); 1153 return startMiniHBaseCluster(option); 1154 } 1155 1156 /** 1157 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 1158 * {@link #startMiniCluster()}. All other options will use default values, defined in 1159 * {@link StartMiniClusterOption.Builder}. 1160 * @param numMasters Master node number. 1161 * @param numRegionServers Number of region servers. 1162 * @param rsPorts Ports that RegionServer should use. 1163 * @return The mini HBase cluster created. 1164 * @see #shutdownMiniHBaseCluster() 1165 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 1166 * {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead. 1167 * @see #startMiniHBaseCluster(StartMiniClusterOption) 1168 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 1169 */ 1170 @Deprecated 1171 public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 1172 List<Integer> rsPorts) throws IOException, InterruptedException { 1173 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 1174 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 1175 return startMiniHBaseCluster(option); 1176 } 1177 1178 /** 1179 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 1180 * {@link #startMiniCluster()}. All other options will use default values, defined in 1181 * {@link StartMiniClusterOption.Builder}. 1182 * @param numMasters Master node number. 1183 * @param numRegionServers Number of region servers. 1184 * @param rsPorts Ports that RegionServer should use. 1185 * @param masterClass The class to use as HMaster, or null for default. 1186 * @param rsClass The class to use as HRegionServer, or null for default. 1187 * @param createRootDir Whether to create a new root or data directory path. 1188 * @param createWALDir Whether to create a new WAL directory. 1189 * @return The mini HBase cluster created. 1190 * @see #shutdownMiniHBaseCluster() 1191 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 1192 * {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead. 1193 * @see #startMiniHBaseCluster(StartMiniClusterOption) 1194 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 1195 */ 1196 @Deprecated 1197 public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 1198 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 1199 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir, 1200 boolean createWALDir) throws IOException, InterruptedException { 1201 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 1202 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 1203 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 1204 return startMiniHBaseCluster(option); 1205 } 1206 1207 /** 1208 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 1209 * want to keep dfs/zk up and just stop/start hbase. 1210 * @param servers number of region servers 1211 */ 1212 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 1213 this.restartHBaseCluster(servers, null); 1214 } 1215 1216 public void restartHBaseCluster(int servers, List<Integer> ports) 1217 throws IOException, InterruptedException { 1218 StartMiniClusterOption option = 1219 StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 1220 restartHBaseCluster(option); 1221 invalidateConnection(); 1222 } 1223 1224 public void restartHBaseCluster(StartMiniClusterOption option) 1225 throws IOException, InterruptedException { 1226 closeConnection(); 1227 this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(), 1228 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1229 option.getMasterClass(), option.getRsClass()); 1230 // Don't leave here till we've done a successful scan of the hbase:meta 1231 Connection conn = ConnectionFactory.createConnection(this.conf); 1232 Table t = conn.getTable(TableName.META_TABLE_NAME); 1233 ResultScanner s = t.getScanner(new Scan()); 1234 while (s.next() != null) { 1235 // do nothing 1236 } 1237 LOG.info("HBase has been restarted"); 1238 s.close(); 1239 t.close(); 1240 conn.close(); 1241 } 1242 1243 /** 1244 * @return Current mini hbase cluster. Only has something in it after a call to 1245 * {@link #startMiniCluster()}. 1246 * @see #startMiniCluster() 1247 */ 1248 public MiniHBaseCluster getMiniHBaseCluster() { 1249 if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) { 1250 return (MiniHBaseCluster) this.hbaseCluster; 1251 } 1252 throw new RuntimeException( 1253 hbaseCluster + " not an instance of " + MiniHBaseCluster.class.getName()); 1254 } 1255 1256 /** 1257 * Stops mini hbase, zk, and hdfs clusters. 1258 * @see #startMiniCluster(int) 1259 */ 1260 public void shutdownMiniCluster() throws IOException { 1261 LOG.info("Shutting down minicluster"); 1262 shutdownMiniHBaseCluster(); 1263 shutdownMiniDFSCluster(); 1264 shutdownMiniZKCluster(); 1265 1266 cleanupTestDir(); 1267 miniClusterRunning = false; 1268 LOG.info("Minicluster is down"); 1269 } 1270 1271 /** 1272 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1273 * @throws java.io.IOException in case command is unsuccessful 1274 */ 1275 public void shutdownMiniHBaseCluster() throws IOException { 1276 cleanup(); 1277 if (this.hbaseCluster != null) { 1278 this.hbaseCluster.shutdown(); 1279 // Wait till hbase is down before going on to shutdown zk. 1280 this.hbaseCluster.waitUntilShutDown(); 1281 this.hbaseCluster = null; 1282 } 1283 if (zooKeeperWatcher != null) { 1284 zooKeeperWatcher.close(); 1285 zooKeeperWatcher = null; 1286 } 1287 } 1288 1289 /** 1290 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1291 * @throws java.io.IOException throws in case command is unsuccessful 1292 */ 1293 public void killMiniHBaseCluster() throws IOException { 1294 cleanup(); 1295 if (this.hbaseCluster != null) { 1296 getMiniHBaseCluster().killAll(); 1297 this.hbaseCluster = null; 1298 } 1299 if (zooKeeperWatcher != null) { 1300 zooKeeperWatcher.close(); 1301 zooKeeperWatcher = null; 1302 } 1303 } 1304 1305 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1306 private void cleanup() throws IOException { 1307 closeConnection(); 1308 // unset the configuration for MIN and MAX RS to start 1309 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1310 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1311 } 1312 1313 /** 1314 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1315 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1316 * If false, previous path is used. Note: this does not cause the root dir to be created. 1317 * @return Fully qualified path for the default hbase root dir 1318 */ 1319 public Path getDefaultRootDirPath(boolean create) throws IOException { 1320 if (!create) { 1321 return getDataTestDirOnTestFS(); 1322 } else { 1323 return getNewDataTestDirOnTestFS(); 1324 } 1325 } 1326 1327 /** 1328 * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)} except that 1329 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1330 * @return Fully qualified path for the default hbase root dir 1331 */ 1332 public Path getDefaultRootDirPath() throws IOException { 1333 return getDefaultRootDirPath(false); 1334 } 1335 1336 /** 1337 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1338 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1339 * startup. You'd only use this method if you were doing manual operation. 1340 * @param create This flag decides whether to get a new root or data directory path or not, if it 1341 * has been fetched already. Note : Directory will be made irrespective of whether 1342 * path has been fetched or not. If directory already exists, it will be overwritten 1343 * @return Fully qualified path to hbase root dir 1344 */ 1345 public Path createRootDir(boolean create) throws IOException { 1346 FileSystem fs = FileSystem.get(this.conf); 1347 Path hbaseRootdir = getDefaultRootDirPath(create); 1348 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1349 fs.mkdirs(hbaseRootdir); 1350 FSUtils.setVersion(fs, hbaseRootdir); 1351 return hbaseRootdir; 1352 } 1353 1354 /** 1355 * Same as {@link HBaseTestingUtility#createRootDir(boolean create)} except that 1356 * <code>create</code> flag is false. 1357 * @return Fully qualified path to hbase root dir 1358 */ 1359 public Path createRootDir() throws IOException { 1360 return createRootDir(false); 1361 } 1362 1363 /** 1364 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1365 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1366 * this method if you were doing manual operation. 1367 * @return Fully qualified path to hbase root dir 1368 */ 1369 public Path createWALRootDir() throws IOException { 1370 FileSystem fs = FileSystem.get(this.conf); 1371 Path walDir = getNewDataTestDirOnTestFS(); 1372 CommonFSUtils.setWALRootDir(this.conf, walDir); 1373 fs.mkdirs(walDir); 1374 return walDir; 1375 } 1376 1377 private void setHBaseFsTmpDir() throws IOException { 1378 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1379 if (hbaseFsTmpDirInString == null) { 1380 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1381 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1382 } else { 1383 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1384 } 1385 } 1386 1387 /** 1388 * Flushes all caches in the mini hbase cluster 1389 */ 1390 public void flush() throws IOException { 1391 getMiniHBaseCluster().flushcache(); 1392 } 1393 1394 /** 1395 * Flushes all caches in the mini hbase cluster 1396 */ 1397 public void flush(TableName tableName) throws IOException { 1398 getMiniHBaseCluster().flushcache(tableName); 1399 } 1400 1401 /** 1402 * Compact all regions in the mini hbase cluster 1403 */ 1404 public void compact(boolean major) throws IOException { 1405 getMiniHBaseCluster().compact(major); 1406 } 1407 1408 /** 1409 * Compact all of a table's reagion in the mini hbase cluster 1410 */ 1411 public void compact(TableName tableName, boolean major) throws IOException { 1412 getMiniHBaseCluster().compact(tableName, major); 1413 } 1414 1415 /** 1416 * Create a table. 1417 * @return A Table instance for the created table. 1418 */ 1419 public Table createTable(TableName tableName, String family) throws IOException { 1420 return createTable(tableName, new String[] { family }); 1421 } 1422 1423 /** 1424 * Create a table. 1425 * @return A Table instance for the created table. 1426 */ 1427 public Table createTable(TableName tableName, String[] families) throws IOException { 1428 List<byte[]> fams = new ArrayList<>(families.length); 1429 for (String family : families) { 1430 fams.add(Bytes.toBytes(family)); 1431 } 1432 return createTable(tableName, fams.toArray(new byte[0][])); 1433 } 1434 1435 /** 1436 * Create a table. 1437 * @return A Table instance for the created table. 1438 */ 1439 public Table createTable(TableName tableName, byte[] family) throws IOException { 1440 return createTable(tableName, new byte[][] { family }); 1441 } 1442 1443 /** 1444 * Create a table with multiple regions. 1445 * @return A Table instance for the created table. 1446 */ 1447 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1448 throws IOException { 1449 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1450 byte[] startKey = Bytes.toBytes("aaaaa"); 1451 byte[] endKey = Bytes.toBytes("zzzzz"); 1452 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1453 1454 return createTable(tableName, new byte[][] { family }, splitKeys); 1455 } 1456 1457 /** 1458 * Create a table. 1459 * @return A Table instance for the created table. 1460 */ 1461 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1462 return createTable(tableName, families, (byte[][]) null); 1463 } 1464 1465 /** 1466 * Create a table with multiple regions. 1467 * @return A Table instance for the created table. 1468 */ 1469 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1470 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1471 } 1472 1473 /** 1474 * Create a table with multiple regions. 1475 * @param replicaCount replica count. 1476 * @return A Table instance for the created table. 1477 */ 1478 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1479 throws IOException { 1480 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1481 } 1482 1483 /** 1484 * Create a table. 1485 * @return A Table instance for the created table. 1486 */ 1487 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1488 throws IOException { 1489 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1490 } 1491 1492 /** 1493 * Create a table. 1494 * @param tableName the table name 1495 * @param families the families 1496 * @param splitKeys the splitkeys 1497 * @param replicaCount the region replica count 1498 * @return A Table instance for the created table. 1499 * @throws IOException throws IOException 1500 */ 1501 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1502 int replicaCount) throws IOException { 1503 return createTable(tableName, families, splitKeys, replicaCount, 1504 new Configuration(getConfiguration())); 1505 } 1506 1507 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1508 byte[] endKey, int numRegions) throws IOException { 1509 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1510 1511 getAdmin().createTable(desc, startKey, endKey, numRegions); 1512 // HBaseAdmin only waits for regions to appear in hbase:meta we 1513 // should wait until they are assigned 1514 waitUntilAllRegionsAssigned(tableName); 1515 return getConnection().getTable(tableName); 1516 } 1517 1518 /** 1519 * Create a table. 1520 * @param c Configuration to use 1521 * @return A Table instance for the created table. 1522 */ 1523 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1524 throws IOException { 1525 return createTable(htd, families, null, c); 1526 } 1527 1528 /** 1529 * Create a table. 1530 * @param htd table descriptor 1531 * @param families array of column families 1532 * @param splitKeys array of split keys 1533 * @param c Configuration to use 1534 * @return A Table instance for the created table. 1535 * @throws IOException if getAdmin or createTable fails 1536 */ 1537 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1538 Configuration c) throws IOException { 1539 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1540 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1541 // on is interfering. 1542 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1543 } 1544 1545 /** 1546 * Create a table. 1547 * @param htd table descriptor 1548 * @param families array of column families 1549 * @param splitKeys array of split keys 1550 * @param type Bloom type 1551 * @param blockSize block size 1552 * @param c Configuration to use 1553 * @return A Table instance for the created table. 1554 * @throws IOException if getAdmin or createTable fails 1555 */ 1556 1557 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1558 BloomType type, int blockSize, Configuration c) throws IOException { 1559 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1560 for (byte[] family : families) { 1561 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1562 .setBloomFilterType(type).setBlocksize(blockSize); 1563 if (isNewVersionBehaviorEnabled()) { 1564 cfdb.setNewVersionBehavior(true); 1565 } 1566 builder.setColumnFamily(cfdb.build()); 1567 } 1568 TableDescriptor td = builder.build(); 1569 if (splitKeys != null) { 1570 getAdmin().createTable(td, splitKeys); 1571 } else { 1572 getAdmin().createTable(td); 1573 } 1574 // HBaseAdmin only waits for regions to appear in hbase:meta 1575 // we should wait until they are assigned 1576 waitUntilAllRegionsAssigned(td.getTableName()); 1577 return getConnection().getTable(td.getTableName()); 1578 } 1579 1580 /** 1581 * Create a table. 1582 * @param htd table descriptor 1583 * @param splitRows array of split keys 1584 * @return A Table instance for the created table. 1585 */ 1586 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1587 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1588 if (isNewVersionBehaviorEnabled()) { 1589 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1590 builder.setColumnFamily( 1591 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1592 } 1593 } 1594 if (splitRows != null) { 1595 getAdmin().createTable(builder.build(), splitRows); 1596 } else { 1597 getAdmin().createTable(builder.build()); 1598 } 1599 // HBaseAdmin only waits for regions to appear in hbase:meta 1600 // we should wait until they are assigned 1601 waitUntilAllRegionsAssigned(htd.getTableName()); 1602 return getConnection().getTable(htd.getTableName()); 1603 } 1604 1605 /** 1606 * Create a table. 1607 * @param tableName the table name 1608 * @param families the families 1609 * @param splitKeys the split keys 1610 * @param replicaCount the replica count 1611 * @param c Configuration to use 1612 * @return A Table instance for the created table. 1613 */ 1614 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1615 int replicaCount, final Configuration c) throws IOException { 1616 TableDescriptor htd = 1617 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1618 return createTable(htd, families, splitKeys, c); 1619 } 1620 1621 /** 1622 * Create a table. 1623 * @return A Table instance for the created table. 1624 */ 1625 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1626 return createTable(tableName, new byte[][] { family }, numVersions); 1627 } 1628 1629 /** 1630 * Create a table. 1631 * @return A Table instance for the created table. 1632 */ 1633 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1634 throws IOException { 1635 return createTable(tableName, families, numVersions, (byte[][]) null); 1636 } 1637 1638 /** 1639 * Create a table. 1640 * @return A Table instance for the created table. 1641 */ 1642 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1643 byte[][] splitKeys) throws IOException { 1644 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1645 for (byte[] family : families) { 1646 ColumnFamilyDescriptorBuilder cfBuilder = 1647 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1648 if (isNewVersionBehaviorEnabled()) { 1649 cfBuilder.setNewVersionBehavior(true); 1650 } 1651 builder.setColumnFamily(cfBuilder.build()); 1652 } 1653 if (splitKeys != null) { 1654 getAdmin().createTable(builder.build(), splitKeys); 1655 } else { 1656 getAdmin().createTable(builder.build()); 1657 } 1658 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1659 // assigned 1660 waitUntilAllRegionsAssigned(tableName); 1661 return getConnection().getTable(tableName); 1662 } 1663 1664 /** 1665 * Create a table with multiple regions. 1666 * @return A Table instance for the created table. 1667 */ 1668 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1669 throws IOException { 1670 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1671 } 1672 1673 /** 1674 * Create a table. 1675 * @return A Table instance for the created table. 1676 */ 1677 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1678 throws IOException { 1679 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1680 for (byte[] family : families) { 1681 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1682 .setMaxVersions(numVersions).setBlocksize(blockSize); 1683 if (isNewVersionBehaviorEnabled()) { 1684 cfBuilder.setNewVersionBehavior(true); 1685 } 1686 builder.setColumnFamily(cfBuilder.build()); 1687 } 1688 getAdmin().createTable(builder.build()); 1689 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1690 // assigned 1691 waitUntilAllRegionsAssigned(tableName); 1692 return getConnection().getTable(tableName); 1693 } 1694 1695 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1696 String cpName) throws IOException { 1697 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1698 for (byte[] family : families) { 1699 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1700 .setMaxVersions(numVersions).setBlocksize(blockSize); 1701 if (isNewVersionBehaviorEnabled()) { 1702 cfBuilder.setNewVersionBehavior(true); 1703 } 1704 builder.setColumnFamily(cfBuilder.build()); 1705 } 1706 if (cpName != null) { 1707 builder.setCoprocessor(cpName); 1708 } 1709 getAdmin().createTable(builder.build()); 1710 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1711 // assigned 1712 waitUntilAllRegionsAssigned(tableName); 1713 return getConnection().getTable(tableName); 1714 } 1715 1716 /** 1717 * Create a table. 1718 * @return A Table instance for the created table. 1719 */ 1720 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1721 throws IOException { 1722 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1723 int i = 0; 1724 for (byte[] family : families) { 1725 ColumnFamilyDescriptorBuilder cfBuilder = 1726 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1727 if (isNewVersionBehaviorEnabled()) { 1728 cfBuilder.setNewVersionBehavior(true); 1729 } 1730 builder.setColumnFamily(cfBuilder.build()); 1731 i++; 1732 } 1733 getAdmin().createTable(builder.build()); 1734 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1735 // assigned 1736 waitUntilAllRegionsAssigned(tableName); 1737 return getConnection().getTable(tableName); 1738 } 1739 1740 /** 1741 * Create a table. 1742 * @return A Table instance for the created table. 1743 */ 1744 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1745 throws IOException { 1746 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1747 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1748 if (isNewVersionBehaviorEnabled()) { 1749 cfBuilder.setNewVersionBehavior(true); 1750 } 1751 builder.setColumnFamily(cfBuilder.build()); 1752 getAdmin().createTable(builder.build(), splitRows); 1753 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1754 // assigned 1755 waitUntilAllRegionsAssigned(tableName); 1756 return getConnection().getTable(tableName); 1757 } 1758 1759 /** 1760 * Create a table with multiple regions. 1761 * @return A Table instance for the created table. 1762 */ 1763 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1764 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1765 } 1766 1767 /** 1768 * Modify a table, synchronous. 1769 * @deprecated since 3.0.0 and will be removed in 4.0.0. Just use 1770 * {@link Admin#modifyTable(TableDescriptor)} directly as it is synchronous now. 1771 * @see Admin#modifyTable(TableDescriptor) 1772 * @see <a href="https://issues.apache.org/jira/browse/HBASE-22002">HBASE-22002</a> 1773 */ 1774 @Deprecated 1775 public static void modifyTableSync(Admin admin, TableDescriptor desc) 1776 throws IOException, InterruptedException { 1777 admin.modifyTable(desc); 1778 } 1779 1780 /** 1781 * Set the number of Region replicas. 1782 */ 1783 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1784 throws IOException, InterruptedException { 1785 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1786 .setRegionReplication(replicaCount).build(); 1787 admin.modifyTable(desc); 1788 } 1789 1790 /** 1791 * Drop an existing table 1792 * @param tableName existing table 1793 */ 1794 public void deleteTable(TableName tableName) throws IOException { 1795 try { 1796 getAdmin().disableTable(tableName); 1797 } catch (TableNotEnabledException e) { 1798 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1799 } 1800 getAdmin().deleteTable(tableName); 1801 } 1802 1803 /** 1804 * Drop an existing table 1805 * @param tableName existing table 1806 */ 1807 public void deleteTableIfAny(TableName tableName) throws IOException { 1808 try { 1809 deleteTable(tableName); 1810 } catch (TableNotFoundException e) { 1811 // ignore 1812 } 1813 } 1814 1815 // ========================================================================== 1816 // Canned table and table descriptor creation 1817 1818 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1819 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1820 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1821 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1822 private static final int MAXVERSIONS = 3; 1823 1824 public static final char FIRST_CHAR = 'a'; 1825 public static final char LAST_CHAR = 'z'; 1826 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1827 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1828 1829 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1830 return createModifyableTableDescriptor(TableName.valueOf(name), 1831 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1832 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1833 } 1834 1835 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1836 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1837 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1838 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1839 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1840 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1841 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1842 if (isNewVersionBehaviorEnabled()) { 1843 cfBuilder.setNewVersionBehavior(true); 1844 } 1845 builder.setColumnFamily(cfBuilder.build()); 1846 } 1847 return builder.build(); 1848 } 1849 1850 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1851 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1852 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1853 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1854 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1855 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1856 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1857 if (isNewVersionBehaviorEnabled()) { 1858 cfBuilder.setNewVersionBehavior(true); 1859 } 1860 builder.setColumnFamily(cfBuilder.build()); 1861 } 1862 return builder; 1863 } 1864 1865 /** 1866 * Create a table of name <code>name</code>. 1867 * @param name Name to give table. 1868 * @return Column descriptor. 1869 */ 1870 public TableDescriptor createTableDescriptor(final TableName name) { 1871 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1872 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1873 } 1874 1875 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1876 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1877 } 1878 1879 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1880 int maxVersions) { 1881 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1882 for (byte[] family : families) { 1883 ColumnFamilyDescriptorBuilder cfBuilder = 1884 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1885 if (isNewVersionBehaviorEnabled()) { 1886 cfBuilder.setNewVersionBehavior(true); 1887 } 1888 builder.setColumnFamily(cfBuilder.build()); 1889 } 1890 return builder.build(); 1891 } 1892 1893 /** 1894 * Create an HRegion that writes to the local tmp dirs 1895 * @param desc a table descriptor indicating which table the region belongs to 1896 * @param startKey the start boundary of the region 1897 * @param endKey the end boundary of the region 1898 * @return a region that writes to local dir for testing 1899 */ 1900 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1901 throws IOException { 1902 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1903 .setEndKey(endKey).build(); 1904 return createLocalHRegion(hri, desc); 1905 } 1906 1907 /** 1908 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1909 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it. 1910 */ 1911 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1912 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1913 } 1914 1915 /** 1916 * Create an HRegion that writes to the local tmp dirs with specified wal 1917 * @param info regioninfo 1918 * @param conf configuration 1919 * @param desc table descriptor 1920 * @param wal wal for this region. 1921 * @return created hregion 1922 */ 1923 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1924 WAL wal) throws IOException { 1925 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1926 } 1927 1928 /** 1929 * Return a region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} 1930 * when done. 1931 */ 1932 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1933 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1934 throws IOException { 1935 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1936 durability, wal, null, families); 1937 } 1938 1939 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1940 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1941 boolean[] compactedMemStore, byte[]... families) throws IOException { 1942 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1943 builder.setReadOnly(isReadOnly); 1944 int i = 0; 1945 for (byte[] family : families) { 1946 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1947 if (compactedMemStore != null && i < compactedMemStore.length) { 1948 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1949 } else { 1950 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1951 1952 } 1953 i++; 1954 // Set default to be three versions. 1955 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1956 builder.setColumnFamily(cfBuilder.build()); 1957 } 1958 builder.setDurability(durability); 1959 RegionInfo info = 1960 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1961 return createLocalHRegion(info, conf, builder.build(), wal); 1962 } 1963 1964 // 1965 // ========================================================================== 1966 1967 /** 1968 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1969 * read. 1970 * @param tableName existing table 1971 * @return HTable to that new table 1972 */ 1973 public Table deleteTableData(TableName tableName) throws IOException { 1974 Table table = getConnection().getTable(tableName); 1975 Scan scan = new Scan(); 1976 ResultScanner resScan = table.getScanner(scan); 1977 for (Result res : resScan) { 1978 Delete del = new Delete(res.getRow()); 1979 table.delete(del); 1980 } 1981 resScan = table.getScanner(scan); 1982 resScan.close(); 1983 return table; 1984 } 1985 1986 /** 1987 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1988 * table. 1989 * @param tableName table which must exist. 1990 * @param preserveRegions keep the existing split points 1991 * @return HTable for the new table 1992 */ 1993 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 1994 throws IOException { 1995 Admin admin = getAdmin(); 1996 if (!admin.isTableDisabled(tableName)) { 1997 admin.disableTable(tableName); 1998 } 1999 admin.truncateTable(tableName, preserveRegions); 2000 return getConnection().getTable(tableName); 2001 } 2002 2003 /** 2004 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 2005 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 2006 * preserve regions of existing table. 2007 * @param tableName table which must exist. 2008 * @return HTable for the new table 2009 */ 2010 public Table truncateTable(final TableName tableName) throws IOException { 2011 return truncateTable(tableName, false); 2012 } 2013 2014 /** 2015 * Load table with rows from 'aaa' to 'zzz'. 2016 * @param t Table 2017 * @param f Family 2018 * @return Count of rows loaded. 2019 */ 2020 public int loadTable(final Table t, final byte[] f) throws IOException { 2021 return loadTable(t, new byte[][] { f }); 2022 } 2023 2024 /** 2025 * Load table with rows from 'aaa' to 'zzz'. 2026 * @param t Table 2027 * @param f Family 2028 * @return Count of rows loaded. 2029 */ 2030 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 2031 return loadTable(t, new byte[][] { f }, null, writeToWAL); 2032 } 2033 2034 /** 2035 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 2036 * @param t Table 2037 * @param f Array of Families to load 2038 * @return Count of rows loaded. 2039 */ 2040 public int loadTable(final Table t, final byte[][] f) throws IOException { 2041 return loadTable(t, f, null); 2042 } 2043 2044 /** 2045 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 2046 * @param t Table 2047 * @param f Array of Families to load 2048 * @param value the values of the cells. If null is passed, the row key is used as value 2049 * @return Count of rows loaded. 2050 */ 2051 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 2052 return loadTable(t, f, value, true); 2053 } 2054 2055 /** 2056 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 2057 * @param t Table 2058 * @param f Array of Families to load 2059 * @param value the values of the cells. If null is passed, the row key is used as value 2060 * @return Count of rows loaded. 2061 */ 2062 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 2063 throws IOException { 2064 List<Put> puts = new ArrayList<>(); 2065 for (byte[] row : HBaseTestingUtility.ROWS) { 2066 Put put = new Put(row); 2067 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 2068 for (int i = 0; i < f.length; i++) { 2069 byte[] value1 = value != null ? value : row; 2070 put.addColumn(f[i], f[i], value1); 2071 } 2072 puts.add(put); 2073 } 2074 t.put(puts); 2075 return puts.size(); 2076 } 2077 2078 /** 2079 * A tracker for tracking and validating table rows generated with 2080 * {@link HBaseTestingUtility#loadTable(Table, byte[])} 2081 */ 2082 public static class SeenRowTracker { 2083 int dim = 'z' - 'a' + 1; 2084 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 2085 byte[] startRow; 2086 byte[] stopRow; 2087 2088 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 2089 this.startRow = startRow; 2090 this.stopRow = stopRow; 2091 } 2092 2093 void reset() { 2094 for (byte[] row : ROWS) { 2095 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 2096 } 2097 } 2098 2099 int i(byte b) { 2100 return b - 'a'; 2101 } 2102 2103 public void addRow(byte[] row) { 2104 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 2105 } 2106 2107 /** 2108 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 2109 * rows none 2110 */ 2111 public void validate() { 2112 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2113 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2114 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2115 int count = seenRows[i(b1)][i(b2)][i(b3)]; 2116 int expectedCount = 0; 2117 if ( 2118 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 2119 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 2120 ) { 2121 expectedCount = 1; 2122 } 2123 if (count != expectedCount) { 2124 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 2125 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 2126 + "instead of " + expectedCount); 2127 } 2128 } 2129 } 2130 } 2131 } 2132 } 2133 2134 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 2135 return loadRegion(r, f, false); 2136 } 2137 2138 public int loadRegion(final Region r, final byte[] f) throws IOException { 2139 return loadRegion((HRegion) r, f); 2140 } 2141 2142 /** 2143 * Load region with rows from 'aaa' to 'zzz'. 2144 * @param r Region 2145 * @param f Family 2146 * @param flush flush the cache if true 2147 * @return Count of rows loaded. 2148 */ 2149 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 2150 byte[] k = new byte[3]; 2151 int rowCount = 0; 2152 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2153 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2154 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2155 k[0] = b1; 2156 k[1] = b2; 2157 k[2] = b3; 2158 Put put = new Put(k); 2159 put.setDurability(Durability.SKIP_WAL); 2160 put.addColumn(f, null, k); 2161 if (r.getWAL() == null) { 2162 put.setDurability(Durability.SKIP_WAL); 2163 } 2164 int preRowCount = rowCount; 2165 int pause = 10; 2166 int maxPause = 1000; 2167 while (rowCount == preRowCount) { 2168 try { 2169 r.put(put); 2170 rowCount++; 2171 } catch (RegionTooBusyException e) { 2172 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 2173 Threads.sleep(pause); 2174 } 2175 } 2176 } 2177 } 2178 if (flush) { 2179 r.flush(true); 2180 } 2181 } 2182 return rowCount; 2183 } 2184 2185 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2186 throws IOException { 2187 for (int i = startRow; i < endRow; i++) { 2188 byte[] data = Bytes.toBytes(String.valueOf(i)); 2189 Put put = new Put(data); 2190 put.addColumn(f, null, data); 2191 t.put(put); 2192 } 2193 } 2194 2195 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 2196 throws IOException { 2197 byte[] row = new byte[rowSize]; 2198 for (int i = 0; i < totalRows; i++) { 2199 Bytes.random(row); 2200 Put put = new Put(row); 2201 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 2202 t.put(put); 2203 } 2204 } 2205 2206 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 2207 int replicaId) throws IOException { 2208 for (int i = startRow; i < endRow; i++) { 2209 String failMsg = "Failed verification of row :" + i; 2210 byte[] data = Bytes.toBytes(String.valueOf(i)); 2211 Get get = new Get(data); 2212 get.setReplicaId(replicaId); 2213 get.setConsistency(Consistency.TIMELINE); 2214 Result result = table.get(get); 2215 if (!result.containsColumn(f, null)) { 2216 throw new AssertionError(failMsg); 2217 } 2218 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2219 Cell cell = result.getColumnLatestCell(f, null); 2220 if ( 2221 !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), 2222 cell.getValueLength()) 2223 ) { 2224 throw new AssertionError(failMsg); 2225 } 2226 } 2227 } 2228 2229 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 2230 throws IOException { 2231 verifyNumericRows((HRegion) region, f, startRow, endRow); 2232 } 2233 2234 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 2235 throws IOException { 2236 verifyNumericRows(region, f, startRow, endRow, true); 2237 } 2238 2239 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 2240 final boolean present) throws IOException { 2241 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 2242 } 2243 2244 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 2245 final boolean present) throws IOException { 2246 for (int i = startRow; i < endRow; i++) { 2247 String failMsg = "Failed verification of row :" + i; 2248 byte[] data = Bytes.toBytes(String.valueOf(i)); 2249 Result result = region.get(new Get(data)); 2250 2251 boolean hasResult = result != null && !result.isEmpty(); 2252 if (present != hasResult) { 2253 throw new AssertionError( 2254 failMsg + result + " expected:<" + present + "> but was:<" + hasResult + ">"); 2255 } 2256 if (!present) continue; 2257 2258 if (!result.containsColumn(f, null)) { 2259 throw new AssertionError(failMsg); 2260 } 2261 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2262 Cell cell = result.getColumnLatestCell(f, null); 2263 if ( 2264 !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), 2265 cell.getValueLength()) 2266 ) { 2267 throw new AssertionError(failMsg); 2268 } 2269 } 2270 } 2271 2272 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2273 throws IOException { 2274 for (int i = startRow; i < endRow; i++) { 2275 byte[] data = Bytes.toBytes(String.valueOf(i)); 2276 Delete delete = new Delete(data); 2277 delete.addFamily(f); 2278 t.delete(delete); 2279 } 2280 } 2281 2282 /** 2283 * Return the number of rows in the given table. 2284 * @param table to count rows 2285 * @return count of rows 2286 */ 2287 public static int countRows(final Table table) throws IOException { 2288 return countRows(table, new Scan()); 2289 } 2290 2291 public static int countRows(final Table table, final Scan scan) throws IOException { 2292 try (ResultScanner results = table.getScanner(scan)) { 2293 int count = 0; 2294 while (results.next() != null) { 2295 count++; 2296 } 2297 return count; 2298 } 2299 } 2300 2301 public int countRows(final Table table, final byte[]... families) throws IOException { 2302 Scan scan = new Scan(); 2303 for (byte[] family : families) { 2304 scan.addFamily(family); 2305 } 2306 return countRows(table, scan); 2307 } 2308 2309 /** 2310 * Return the number of rows in the given table. 2311 */ 2312 public int countRows(final TableName tableName) throws IOException { 2313 Table table = getConnection().getTable(tableName); 2314 try { 2315 return countRows(table); 2316 } finally { 2317 table.close(); 2318 } 2319 } 2320 2321 public int countRows(final Region region) throws IOException { 2322 return countRows(region, new Scan()); 2323 } 2324 2325 public int countRows(final Region region, final Scan scan) throws IOException { 2326 InternalScanner scanner = region.getScanner(scan); 2327 try { 2328 return countRows(scanner); 2329 } finally { 2330 scanner.close(); 2331 } 2332 } 2333 2334 public int countRows(final InternalScanner scanner) throws IOException { 2335 int scannedCount = 0; 2336 List<Cell> results = new ArrayList<>(); 2337 boolean hasMore = true; 2338 while (hasMore) { 2339 hasMore = scanner.next(results); 2340 scannedCount += results.size(); 2341 results.clear(); 2342 } 2343 return scannedCount; 2344 } 2345 2346 /** 2347 * Return an md5 digest of the entire contents of a table. 2348 */ 2349 public String checksumRows(final Table table) throws Exception { 2350 2351 Scan scan = new Scan(); 2352 ResultScanner results = table.getScanner(scan); 2353 MessageDigest digest = MessageDigest.getInstance("MD5"); 2354 for (Result res : results) { 2355 digest.update(res.getRow()); 2356 } 2357 results.close(); 2358 return digest.toString(); 2359 } 2360 2361 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2362 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2363 static { 2364 int i = 0; 2365 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2366 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2367 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2368 ROWS[i][0] = b1; 2369 ROWS[i][1] = b2; 2370 ROWS[i][2] = b3; 2371 i++; 2372 } 2373 } 2374 } 2375 } 2376 2377 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2378 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2379 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2380 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2381 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2382 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2383 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2384 2385 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2386 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2387 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2388 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2389 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2390 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2391 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2392 2393 /** 2394 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2395 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2396 * @return list of region info for regions added to meta 2397 */ 2398 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2399 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2400 Table meta = getConnection().getTable(TableName.META_TABLE_NAME); 2401 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2402 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2403 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2404 TableState.State.ENABLED); 2405 // add custom ones 2406 for (int i = 0; i < startKeys.length; i++) { 2407 int j = (i + 1) % startKeys.length; 2408 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2409 .setEndKey(startKeys[j]).build(); 2410 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2411 newRegions.add(hri); 2412 } 2413 2414 meta.close(); 2415 return newRegions; 2416 } 2417 2418 /** 2419 * Create an unmanaged WAL. Be sure to close it when you're through. 2420 */ 2421 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2422 throws IOException { 2423 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2424 // unless I pass along via the conf. 2425 Configuration confForWAL = new Configuration(conf); 2426 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 2427 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri); 2428 } 2429 2430 /** 2431 * Create a region with it's own WAL. Be sure to call 2432 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. 2433 */ 2434 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2435 final Configuration conf, final TableDescriptor htd) throws IOException { 2436 return createRegionAndWAL(info, rootDir, conf, htd, true); 2437 } 2438 2439 /** 2440 * Create a region with it's own WAL. Be sure to call 2441 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. 2442 */ 2443 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2444 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2445 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2446 region.setBlockCache(blockCache); 2447 region.initialize(); 2448 return region; 2449 } 2450 2451 /** 2452 * Create a region with it's own WAL. Be sure to call 2453 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. 2454 */ 2455 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2456 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2457 throws IOException { 2458 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2459 region.setMobFileCache(mobFileCache); 2460 region.initialize(); 2461 return region; 2462 } 2463 2464 /** 2465 * Create a region with it's own WAL. Be sure to call 2466 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. 2467 */ 2468 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2469 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2470 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2471 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2472 WAL wal = createWal(conf, rootDir, info); 2473 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2474 } 2475 2476 /** 2477 * Returns all rows from the hbase:meta table. 2478 * @throws IOException When reading the rows fails. 2479 */ 2480 public List<byte[]> getMetaTableRows() throws IOException { 2481 // TODO: Redo using MetaTableAccessor class 2482 Table t = getConnection().getTable(TableName.META_TABLE_NAME); 2483 List<byte[]> rows = new ArrayList<>(); 2484 ResultScanner s = t.getScanner(new Scan()); 2485 for (Result result : s) { 2486 LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow())); 2487 rows.add(result.getRow()); 2488 } 2489 s.close(); 2490 t.close(); 2491 return rows; 2492 } 2493 2494 /** 2495 * Returns all rows from the hbase:meta table for a given user table 2496 * @throws IOException When reading the rows fails. 2497 */ 2498 public List<byte[]> getMetaTableRows(TableName tableName) throws IOException { 2499 // TODO: Redo using MetaTableAccessor. 2500 Table t = getConnection().getTable(TableName.META_TABLE_NAME); 2501 List<byte[]> rows = new ArrayList<>(); 2502 ResultScanner s = t.getScanner(new Scan()); 2503 for (Result result : s) { 2504 RegionInfo info = CatalogFamilyFormat.getRegionInfo(result); 2505 if (info == null) { 2506 LOG.error("No region info for row " + Bytes.toString(result.getRow())); 2507 // TODO figure out what to do for this new hosed case. 2508 continue; 2509 } 2510 2511 if (info.getTable().equals(tableName)) { 2512 LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()) + info); 2513 rows.add(result.getRow()); 2514 } 2515 } 2516 s.close(); 2517 t.close(); 2518 return rows; 2519 } 2520 2521 /** 2522 * Returns all regions of the specified table 2523 * @param tableName the table name 2524 * @return all regions of the specified table 2525 * @throws IOException when getting the regions fails. 2526 */ 2527 private List<RegionInfo> getRegions(TableName tableName) throws IOException { 2528 try (Admin admin = getConnection().getAdmin()) { 2529 return admin.getRegions(tableName); 2530 } 2531 } 2532 2533 /** 2534 * Find any other region server which is different from the one identified by parameter 2535 * @return another region server 2536 */ 2537 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2538 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2539 if (!(rst.getRegionServer() == rs)) { 2540 return rst.getRegionServer(); 2541 } 2542 } 2543 return null; 2544 } 2545 2546 /** 2547 * Tool to get the reference to the region server object that holds the region of the specified 2548 * user table. 2549 * @param tableName user table to lookup in hbase:meta 2550 * @return region server that holds it, null if the row doesn't exist 2551 */ 2552 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2553 throws IOException, InterruptedException { 2554 List<RegionInfo> regions = getRegions(tableName); 2555 if (regions == null || regions.isEmpty()) { 2556 return null; 2557 } 2558 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2559 2560 byte[] firstRegionName = 2561 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2562 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2563 2564 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2565 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2566 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2567 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2568 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2569 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2570 while (retrier.shouldRetry()) { 2571 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2572 if (index != -1) { 2573 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2574 } 2575 // Came back -1. Region may not be online yet. Sleep a while. 2576 retrier.sleepUntilNextRetry(); 2577 } 2578 return null; 2579 } 2580 2581 /** 2582 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2583 * @throws IOException When starting the cluster fails. 2584 */ 2585 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2586 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2587 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2588 "99.0"); 2589 startMiniMapReduceCluster(2); 2590 return mrCluster; 2591 } 2592 2593 /** 2594 * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its 2595 * internal static LOG_DIR variable. 2596 */ 2597 private void forceChangeTaskLogDir() { 2598 Field logDirField; 2599 try { 2600 logDirField = TaskLog.class.getDeclaredField("LOG_DIR"); 2601 logDirField.setAccessible(true); 2602 2603 Field modifiersField = ReflectionUtils.getModifiersField(); 2604 modifiersField.setAccessible(true); 2605 modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL); 2606 2607 logDirField.set(null, new File(hadoopLogDir, "userlogs")); 2608 } catch (SecurityException e) { 2609 throw new RuntimeException(e); 2610 } catch (NoSuchFieldException e) { 2611 throw new RuntimeException(e); 2612 } catch (IllegalArgumentException e) { 2613 throw new RuntimeException(e); 2614 } catch (IllegalAccessException e) { 2615 throw new RuntimeException(e); 2616 } 2617 } 2618 2619 /** 2620 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2621 * filesystem. 2622 * @param servers The number of <code>TaskTracker</code>'s to start. 2623 * @throws IOException When starting the cluster fails. 2624 */ 2625 private void startMiniMapReduceCluster(final int servers) throws IOException { 2626 if (mrCluster != null) { 2627 throw new IllegalStateException("MiniMRCluster is already running"); 2628 } 2629 LOG.info("Starting mini mapreduce cluster..."); 2630 setupClusterTestDir(); 2631 createDirsAndSetProperties(); 2632 2633 forceChangeTaskLogDir(); 2634 2635 //// hadoop2 specific settings 2636 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2637 // we up the VM usable so that processes don't get killed. 2638 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2639 2640 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2641 // this avoids the problem by disabling speculative task execution in tests. 2642 conf.setBoolean("mapreduce.map.speculative", false); 2643 conf.setBoolean("mapreduce.reduce.speculative", false); 2644 //// 2645 2646 // Allow the user to override FS URI for this map-reduce cluster to use. 2647 mrCluster = 2648 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2649 1, null, null, new JobConf(this.conf)); 2650 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2651 if (jobConf == null) { 2652 jobConf = mrCluster.createJobConf(); 2653 } 2654 // Hadoop MiniMR overwrites this while it should not 2655 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2656 LOG.info("Mini mapreduce cluster started"); 2657 2658 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2659 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2660 // necessary config properties here. YARN-129 required adding a few properties. 2661 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2662 // this for mrv2 support; mr1 ignores this 2663 conf.set("mapreduce.framework.name", "yarn"); 2664 conf.setBoolean("yarn.is.minicluster", true); 2665 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2666 if (rmAddress != null) { 2667 conf.set("yarn.resourcemanager.address", rmAddress); 2668 } 2669 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2670 if (historyAddress != null) { 2671 conf.set("mapreduce.jobhistory.address", historyAddress); 2672 } 2673 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2674 if (schedulerAddress != null) { 2675 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2676 } 2677 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2678 if (mrJobHistoryWebappAddress != null) { 2679 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2680 } 2681 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2682 if (yarnRMWebappAddress != null) { 2683 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2684 } 2685 } 2686 2687 /** 2688 * Stops the previously started <code>MiniMRCluster</code>. 2689 */ 2690 public void shutdownMiniMapReduceCluster() { 2691 if (mrCluster != null) { 2692 LOG.info("Stopping mini mapreduce cluster..."); 2693 mrCluster.shutdown(); 2694 mrCluster = null; 2695 LOG.info("Mini mapreduce cluster stopped"); 2696 } 2697 // Restore configuration to point to local jobtracker 2698 conf.set("mapreduce.jobtracker.address", "local"); 2699 } 2700 2701 /** 2702 * Create a stubbed out RegionServerService, mainly for getting FS. 2703 */ 2704 public RegionServerServices createMockRegionServerService() throws IOException { 2705 return createMockRegionServerService((ServerName) null); 2706 } 2707 2708 /** 2709 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2710 * TestTokenAuthentication 2711 */ 2712 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2713 throws IOException { 2714 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2715 rss.setFileSystem(getTestFileSystem()); 2716 rss.setRpcServer(rpc); 2717 return rss; 2718 } 2719 2720 /** 2721 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2722 * TestOpenRegionHandler 2723 */ 2724 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2725 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2726 rss.setFileSystem(getTestFileSystem()); 2727 return rss; 2728 } 2729 2730 /** 2731 * Switches the logger for the given class to DEBUG level. 2732 * @param clazz The class for which to switch to debug logging. 2733 * @deprecated In 2.3.0, will be removed in 4.0.0. Only support changing log level on log4j now as 2734 * HBase only uses log4j. You should do this by your own as it you know which log 2735 * framework you are using then set the log level to debug is very easy. 2736 */ 2737 @Deprecated 2738 public void enableDebug(Class<?> clazz) { 2739 Log4jUtils.enableDebug(clazz); 2740 } 2741 2742 /** 2743 * Expire the Master's session 2744 */ 2745 public void expireMasterSession() throws Exception { 2746 HMaster master = getMiniHBaseCluster().getMaster(); 2747 expireSession(master.getZooKeeper(), false); 2748 } 2749 2750 /** 2751 * Expire a region server's session 2752 * @param index which RS 2753 */ 2754 public void expireRegionServerSession(int index) throws Exception { 2755 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2756 expireSession(rs.getZooKeeper(), false); 2757 decrementMinRegionServerCount(); 2758 } 2759 2760 private void decrementMinRegionServerCount() { 2761 // decrement the count for this.conf, for newly spwaned master 2762 // this.hbaseCluster shares this configuration too 2763 decrementMinRegionServerCount(getConfiguration()); 2764 2765 // each master thread keeps a copy of configuration 2766 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2767 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2768 } 2769 } 2770 2771 private void decrementMinRegionServerCount(Configuration conf) { 2772 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2773 if (currentCount != -1) { 2774 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2775 } 2776 } 2777 2778 public void expireSession(ZKWatcher nodeZK) throws Exception { 2779 expireSession(nodeZK, false); 2780 } 2781 2782 /** 2783 * Expire a ZooKeeper session as recommended in ZooKeeper documentation 2784 * http://hbase.apache.org/book.html#trouble.zookeeper There are issues when doing this: [1] 2785 * http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html [2] 2786 * https://issues.apache.org/jira/browse/ZOOKEEPER-1105 2787 * @param nodeZK - the ZK watcher to expire 2788 * @param checkStatus - true to check if we can create a Table with the current configuration. 2789 */ 2790 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2791 Configuration c = new Configuration(this.conf); 2792 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2793 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2794 byte[] password = zk.getSessionPasswd(); 2795 long sessionID = zk.getSessionId(); 2796 2797 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2798 // so we create a first watcher to be sure that the 2799 // event was sent. We expect that if our watcher receives the event 2800 // other watchers on the same machine will get is as well. 2801 // When we ask to close the connection, ZK does not close it before 2802 // we receive all the events, so don't have to capture the event, just 2803 // closing the connection should be enough. 2804 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2805 @Override 2806 public void process(WatchedEvent watchedEvent) { 2807 LOG.info("Monitor ZKW received event=" + watchedEvent); 2808 } 2809 }, sessionID, password); 2810 2811 // Making it expire 2812 ZooKeeper newZK = 2813 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2814 2815 // ensure that we have connection to the server before closing down, otherwise 2816 // the close session event will be eaten out before we start CONNECTING state 2817 long start = EnvironmentEdgeManager.currentTime(); 2818 while ( 2819 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2820 ) { 2821 Thread.sleep(1); 2822 } 2823 newZK.close(); 2824 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2825 2826 // Now closing & waiting to be sure that the clients get it. 2827 monitor.close(); 2828 2829 if (checkStatus) { 2830 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2831 } 2832 } 2833 2834 /** 2835 * Get the Mini HBase cluster. 2836 * @return hbase cluster 2837 * @see #getHBaseClusterInterface() 2838 */ 2839 public MiniHBaseCluster getHBaseCluster() { 2840 return getMiniHBaseCluster(); 2841 } 2842 2843 /** 2844 * Returns the HBaseCluster instance. 2845 * <p> 2846 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2847 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2848 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2849 * instead w/o the need to type-cast. 2850 */ 2851 public HBaseCluster getHBaseClusterInterface() { 2852 // implementation note: we should rename this method as #getHBaseCluster(), 2853 // but this would require refactoring 90+ calls. 2854 return hbaseCluster; 2855 } 2856 2857 /** 2858 * Resets the connections so that the next time getConnection() is called, a new connection is 2859 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2860 * the connection is not valid anymore. TODO: There should be a more coherent way of doing this. 2861 * Unfortunately the way tests are written, not all start() stop() calls go through this class. 2862 * Most tests directly operate on the underlying mini/local hbase cluster. That makes it difficult 2863 * for this wrapper class to maintain the connection state automatically. Cleaning this is a much 2864 * bigger refactor. 2865 */ 2866 public void invalidateConnection() throws IOException { 2867 closeConnection(); 2868 // Update the master addresses if they changed. 2869 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2870 final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY); 2871 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2872 masterConfigBefore, masterConfAfter); 2873 conf.set(HConstants.MASTER_ADDRS_KEY, 2874 getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY)); 2875 } 2876 2877 /** 2878 * Get a shared Connection to the cluster. this method is thread safe. 2879 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2880 */ 2881 public Connection getConnection() throws IOException { 2882 return getAsyncConnection().toConnection(); 2883 } 2884 2885 /** 2886 * Get a assigned Connection to the cluster. this method is thread safe. 2887 * @param user assigned user 2888 * @return A Connection with assigned user. 2889 */ 2890 public Connection getConnection(User user) throws IOException { 2891 return getAsyncConnection(user).toConnection(); 2892 } 2893 2894 /** 2895 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2896 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2897 * of cluster. 2898 */ 2899 public AsyncClusterConnection getAsyncConnection() throws IOException { 2900 try { 2901 return asyncConnection.updateAndGet(connection -> { 2902 if (connection == null) { 2903 try { 2904 User user = UserProvider.instantiate(conf).getCurrent(); 2905 connection = getAsyncConnection(user); 2906 } catch (IOException ioe) { 2907 throw new UncheckedIOException("Failed to create connection", ioe); 2908 } 2909 } 2910 return connection; 2911 }); 2912 } catch (UncheckedIOException exception) { 2913 throw exception.getCause(); 2914 } 2915 } 2916 2917 /** 2918 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2919 * @param user assigned user 2920 * @return An AsyncClusterConnection with assigned user. 2921 */ 2922 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2923 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2924 } 2925 2926 public void closeConnection() throws IOException { 2927 if (hbaseAdmin != null) { 2928 Closeables.close(hbaseAdmin, true); 2929 hbaseAdmin = null; 2930 } 2931 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2932 if (asyncConnection != null) { 2933 Closeables.close(asyncConnection, true); 2934 } 2935 } 2936 2937 /** 2938 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2939 * it has no effect, it will be closed automatically when the cluster shutdowns 2940 */ 2941 public Admin getAdmin() throws IOException { 2942 if (hbaseAdmin == null) { 2943 this.hbaseAdmin = getConnection().getAdmin(); 2944 } 2945 return hbaseAdmin; 2946 } 2947 2948 public KeymetaAdminClient getKeymetaAdmin() throws IOException { 2949 return new KeymetaAdminClient(getConnection()); 2950 } 2951 2952 /** 2953 * Returns an {@link Hbck} instance. Needs be closed when done. 2954 */ 2955 public Hbck getHbck() throws IOException { 2956 return getConnection().getHbck(); 2957 } 2958 2959 /** 2960 * Unassign the named region. 2961 * @param regionName The region to unassign. 2962 */ 2963 public void unassignRegion(String regionName) throws IOException { 2964 unassignRegion(Bytes.toBytes(regionName)); 2965 } 2966 2967 /** 2968 * Unassign the named region. 2969 * @param regionName The region to unassign. 2970 */ 2971 public void unassignRegion(byte[] regionName) throws IOException { 2972 getAdmin().unassign(regionName, true); 2973 } 2974 2975 /** 2976 * Closes the region containing the given row. 2977 * @param row The row to find the containing region. 2978 * @param table The table to find the region. 2979 */ 2980 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 2981 unassignRegionByRow(Bytes.toBytes(row), table); 2982 } 2983 2984 /** 2985 * Closes the region containing the given row. 2986 * @param row The row to find the containing region. 2987 * @param table The table to find the region. 2988 */ 2989 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 2990 HRegionLocation hrl = table.getRegionLocation(row); 2991 unassignRegion(hrl.getRegion().getRegionName()); 2992 } 2993 2994 /** 2995 * Retrieves a splittable region randomly from tableName 2996 * @param tableName name of table 2997 * @param maxAttempts maximum number of attempts, unlimited for value of -1 2998 * @return the HRegion chosen, null if none was found within limit of maxAttempts 2999 */ 3000 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 3001 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 3002 int regCount = regions.size(); 3003 Set<Integer> attempted = new HashSet<>(); 3004 int idx; 3005 int attempts = 0; 3006 do { 3007 regions = getHBaseCluster().getRegions(tableName); 3008 if (regCount != regions.size()) { 3009 // if there was region movement, clear attempted Set 3010 attempted.clear(); 3011 } 3012 regCount = regions.size(); 3013 // There are chances that before we get the region for the table from an RS the region may 3014 // be going for CLOSE. This may be because online schema change is enabled 3015 if (regCount > 0) { 3016 idx = ThreadLocalRandom.current().nextInt(regCount); 3017 // if we have just tried this region, there is no need to try again 3018 if (attempted.contains(idx)) { 3019 continue; 3020 } 3021 HRegion region = regions.get(idx); 3022 if (region.checkSplit().isPresent()) { 3023 return region; 3024 } 3025 attempted.add(idx); 3026 } 3027 attempts++; 3028 } while (maxAttempts == -1 || attempts < maxAttempts); 3029 return null; 3030 } 3031 3032 public MiniDFSCluster getDFSCluster() { 3033 return dfsCluster; 3034 } 3035 3036 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 3037 setDFSCluster(cluster, true); 3038 } 3039 3040 /** 3041 * Set the MiniDFSCluster 3042 * @param cluster cluster to use 3043 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 3044 * is set. 3045 * @throws IllegalStateException if the passed cluster is up when it is required to be down 3046 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 3047 */ 3048 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 3049 throws IllegalStateException, IOException { 3050 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 3051 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 3052 } 3053 this.dfsCluster = cluster; 3054 this.setFs(); 3055 } 3056 3057 public FileSystem getTestFileSystem() throws IOException { 3058 return HFileSystem.get(conf); 3059 } 3060 3061 /** 3062 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 3063 * (30 seconds). 3064 * @param table Table to wait on. 3065 */ 3066 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 3067 waitTableAvailable(table.getName(), 30000); 3068 } 3069 3070 public void waitTableAvailable(TableName table, long timeoutMillis) 3071 throws InterruptedException, IOException { 3072 waitFor(timeoutMillis, predicateTableAvailable(table)); 3073 } 3074 3075 /** 3076 * Wait until all regions in a table have been assigned 3077 * @param table Table to wait on. 3078 * @param timeoutMillis Timeout. 3079 */ 3080 public void waitTableAvailable(byte[] table, long timeoutMillis) 3081 throws InterruptedException, IOException { 3082 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 3083 } 3084 3085 public String explainTableAvailability(TableName tableName) throws IOException { 3086 StringBuilder msg = 3087 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 3088 if (getHBaseCluster().getMaster().isAlive()) { 3089 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 3090 .getRegionStates().getRegionAssignments(); 3091 final List<Pair<RegionInfo, ServerName>> metaLocations = 3092 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 3093 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 3094 RegionInfo hri = metaLocation.getFirst(); 3095 ServerName sn = metaLocation.getSecond(); 3096 if (!assignments.containsKey(hri)) { 3097 msg.append(", region ").append(hri) 3098 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 3099 } else if (sn == null) { 3100 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 3101 } else if (!sn.equals(assignments.get(hri))) { 3102 msg.append(", region ").append(hri) 3103 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 3104 .append(" <> ").append(assignments.get(hri)); 3105 } 3106 } 3107 } 3108 return msg.toString(); 3109 } 3110 3111 public String explainTableState(final TableName table, TableState.State state) 3112 throws IOException { 3113 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 3114 if (tableState == null) { 3115 return "TableState in META: No table state in META for table " + table 3116 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 3117 } else if (!tableState.inStates(state)) { 3118 return "TableState in META: Not " + state + " state, but " + tableState; 3119 } else { 3120 return "TableState in META: OK"; 3121 } 3122 } 3123 3124 public TableState findLastTableState(final TableName table) throws IOException { 3125 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 3126 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 3127 @Override 3128 public boolean visit(Result r) throws IOException { 3129 if (!Arrays.equals(r.getRow(), table.getName())) { 3130 return false; 3131 } 3132 TableState state = CatalogFamilyFormat.getTableState(r); 3133 if (state != null) { 3134 lastTableState.set(state); 3135 } 3136 return true; 3137 } 3138 }; 3139 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 3140 Integer.MAX_VALUE, visitor); 3141 return lastTableState.get(); 3142 } 3143 3144 /** 3145 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 3146 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 3147 * table. 3148 * @param table the table to wait on. 3149 * @throws InterruptedException if interrupted while waiting 3150 * @throws IOException if an IO problem is encountered 3151 */ 3152 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 3153 waitTableEnabled(table, 30000); 3154 } 3155 3156 /** 3157 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 3158 * have been all assigned. 3159 * @see #waitTableEnabled(TableName, long) 3160 * @param table Table to wait on. 3161 * @param timeoutMillis Time to wait on it being marked enabled. 3162 */ 3163 public void waitTableEnabled(byte[] table, long timeoutMillis) 3164 throws InterruptedException, IOException { 3165 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 3166 } 3167 3168 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 3169 waitFor(timeoutMillis, predicateTableEnabled(table)); 3170 } 3171 3172 /** 3173 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 3174 * after default period (30 seconds) 3175 * @param table Table to wait on. 3176 */ 3177 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 3178 waitTableDisabled(table, 30000); 3179 } 3180 3181 public void waitTableDisabled(TableName table, long millisTimeout) 3182 throws InterruptedException, IOException { 3183 waitFor(millisTimeout, predicateTableDisabled(table)); 3184 } 3185 3186 /** 3187 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 3188 * @param table Table to wait on. 3189 * @param timeoutMillis Time to wait on it being marked disabled. 3190 */ 3191 public void waitTableDisabled(byte[] table, long timeoutMillis) 3192 throws InterruptedException, IOException { 3193 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 3194 } 3195 3196 /** 3197 * Make sure that at least the specified number of region servers are running 3198 * @param num minimum number of region servers that should be running 3199 * @return true if we started some servers 3200 */ 3201 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 3202 boolean startedServer = false; 3203 MiniHBaseCluster hbaseCluster = getMiniHBaseCluster(); 3204 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 3205 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 3206 startedServer = true; 3207 } 3208 3209 return startedServer; 3210 } 3211 3212 /** 3213 * Make sure that at least the specified number of region servers are running. We don't count the 3214 * ones that are currently stopping or are stopped. 3215 * @param num minimum number of region servers that should be running 3216 * @return true if we started some servers 3217 */ 3218 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 3219 boolean startedServer = ensureSomeRegionServersAvailable(num); 3220 3221 int nonStoppedServers = 0; 3222 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 3223 3224 HRegionServer hrs = rst.getRegionServer(); 3225 if (hrs.isStopping() || hrs.isStopped()) { 3226 LOG.info("A region server is stopped or stopping:" + hrs); 3227 } else { 3228 nonStoppedServers++; 3229 } 3230 } 3231 for (int i = nonStoppedServers; i < num; ++i) { 3232 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 3233 startedServer = true; 3234 } 3235 return startedServer; 3236 } 3237 3238 /** 3239 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 3240 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 3241 * @param c Initial configuration 3242 * @param differentiatingSuffix Suffix to differentiate this user from others. 3243 * @return A new configuration instance with a different user set into it. 3244 */ 3245 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 3246 throws IOException { 3247 FileSystem currentfs = FileSystem.get(c); 3248 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 3249 return User.getCurrent(); 3250 } 3251 // Else distributed filesystem. Make a new instance per daemon. Below 3252 // code is taken from the AppendTestUtil over in hdfs. 3253 String username = User.getCurrent().getName() + differentiatingSuffix; 3254 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 3255 return user; 3256 } 3257 3258 public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster) 3259 throws IOException { 3260 NavigableSet<String> online = new TreeSet<>(); 3261 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 3262 try { 3263 for (RegionInfo region : ProtobufUtil 3264 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 3265 online.add(region.getRegionNameAsString()); 3266 } 3267 } catch (RegionServerStoppedException e) { 3268 // That's fine. 3269 } 3270 } 3271 return online; 3272 } 3273 3274 /** 3275 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 3276 * linger. Here is the exception you'll see: 3277 * 3278 * <pre> 3279 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 3280 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 3281 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 3282 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 3283 * </pre> 3284 * 3285 * @param stream A DFSClient.DFSOutputStream. 3286 */ 3287 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 3288 try { 3289 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 3290 for (Class<?> clazz : clazzes) { 3291 String className = clazz.getSimpleName(); 3292 if (className.equals("DFSOutputStream")) { 3293 if (clazz.isInstance(stream)) { 3294 Field maxRecoveryErrorCountField = 3295 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 3296 maxRecoveryErrorCountField.setAccessible(true); 3297 maxRecoveryErrorCountField.setInt(stream, max); 3298 break; 3299 } 3300 } 3301 } 3302 } catch (Exception e) { 3303 LOG.info("Could not set max recovery field", e); 3304 } 3305 } 3306 3307 /** 3308 * Uses directly the assignment manager to assign the region. and waits until the specified region 3309 * has completed assignment. 3310 * @return true if the region is assigned false otherwise. 3311 */ 3312 public boolean assignRegion(final RegionInfo regionInfo) 3313 throws IOException, InterruptedException { 3314 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 3315 am.assign(regionInfo); 3316 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 3317 } 3318 3319 /** 3320 * Move region to destination server and wait till region is completely moved and online 3321 * @param destRegion region to move 3322 * @param destServer destination server of the region 3323 */ 3324 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3325 throws InterruptedException, IOException { 3326 HMaster master = getMiniHBaseCluster().getMaster(); 3327 // TODO: Here we start the move. The move can take a while. 3328 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3329 while (true) { 3330 ServerName serverName = 3331 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3332 if (serverName != null && serverName.equals(destServer)) { 3333 assertRegionOnServer(destRegion, serverName, 2000); 3334 break; 3335 } 3336 Thread.sleep(10); 3337 } 3338 } 3339 3340 /** 3341 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3342 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3343 * master has been informed and updated hbase:meta with the regions deployed server. 3344 * @param tableName the table name 3345 */ 3346 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3347 waitUntilAllRegionsAssigned(tableName, 3348 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3349 } 3350 3351 /** 3352 * Waith until all system table's regions get assigned 3353 */ 3354 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3355 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3356 } 3357 3358 /** 3359 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3360 * timeout. This means all regions have been deployed, master has been informed and updated 3361 * hbase:meta with the regions deployed server. 3362 * @param tableName the table name 3363 * @param timeout timeout, in milliseconds 3364 */ 3365 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3366 throws IOException { 3367 if (!TableName.isMetaTableName(tableName)) { 3368 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3369 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3370 + timeout + "ms"); 3371 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3372 @Override 3373 public String explainFailure() throws IOException { 3374 return explainTableAvailability(tableName); 3375 } 3376 3377 @Override 3378 public boolean evaluate() throws IOException { 3379 Scan scan = new Scan(); 3380 scan.addFamily(HConstants.CATALOG_FAMILY); 3381 boolean tableFound = false; 3382 try (ResultScanner s = meta.getScanner(scan)) { 3383 for (Result r; (r = s.next()) != null;) { 3384 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3385 RegionInfo info = RegionInfo.parseFromOrNull(b); 3386 if (info != null && info.getTable().equals(tableName)) { 3387 // Get server hosting this region from catalog family. Return false if no server 3388 // hosting this region, or if the server hosting this region was recently killed 3389 // (for fault tolerance testing). 3390 tableFound = true; 3391 byte[] server = 3392 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3393 if (server == null) { 3394 return false; 3395 } else { 3396 byte[] startCode = 3397 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3398 ServerName serverName = 3399 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3400 + Bytes.toLong(startCode)); 3401 if ( 3402 !getHBaseClusterInterface().isDistributedCluster() 3403 && getHBaseCluster().isKilledRS(serverName) 3404 ) { 3405 return false; 3406 } 3407 } 3408 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3409 return false; 3410 } 3411 } 3412 } 3413 } 3414 if (!tableFound) { 3415 LOG.warn( 3416 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3417 } 3418 return tableFound; 3419 } 3420 }); 3421 } 3422 } 3423 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3424 // check from the master state if we are using a mini cluster 3425 if (!getHBaseClusterInterface().isDistributedCluster()) { 3426 // So, all regions are in the meta table but make sure master knows of the assignments before 3427 // returning -- sometimes this can lag. 3428 HMaster master = getHBaseCluster().getMaster(); 3429 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3430 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3431 @Override 3432 public String explainFailure() throws IOException { 3433 return explainTableAvailability(tableName); 3434 } 3435 3436 @Override 3437 public boolean evaluate() throws IOException { 3438 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3439 return hris != null && !hris.isEmpty(); 3440 } 3441 }); 3442 } 3443 LOG.info("All regions for table " + tableName + " assigned."); 3444 } 3445 3446 /** 3447 * Do a small get/scan against one store. This is required because store has no actual methods of 3448 * querying itself, and relies on StoreScanner. 3449 */ 3450 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3451 Scan scan = new Scan(get); 3452 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3453 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3454 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3455 // readpoint 0. 3456 0); 3457 3458 List<Cell> result = new ArrayList<>(); 3459 scanner.next(result); 3460 if (!result.isEmpty()) { 3461 // verify that we are on the row we want: 3462 Cell kv = result.get(0); 3463 if (!CellUtil.matchingRows(kv, get.getRow())) { 3464 result.clear(); 3465 } 3466 } 3467 scanner.close(); 3468 return result; 3469 } 3470 3471 /** 3472 * Create region split keys between startkey and endKey 3473 * @param numRegions the number of regions to be created. it has to be greater than 3. 3474 * @return resulting split keys 3475 */ 3476 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3477 if (numRegions <= 3) { 3478 throw new AssertionError(); 3479 } 3480 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3481 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3482 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3483 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3484 return result; 3485 } 3486 3487 /** 3488 * Do a small get/scan against one store. This is required because store has no actual methods of 3489 * querying itself, and relies on StoreScanner. 3490 */ 3491 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3492 throws IOException { 3493 Get get = new Get(row); 3494 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3495 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3496 3497 return getFromStoreFile(store, get); 3498 } 3499 3500 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3501 final List<? extends Cell> actual) { 3502 final int eLen = expected.size(); 3503 final int aLen = actual.size(); 3504 final int minLen = Math.min(eLen, aLen); 3505 3506 int i; 3507 for (i = 0; i < minLen 3508 && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0; ++i) { 3509 } 3510 3511 if (additionalMsg == null) { 3512 additionalMsg = ""; 3513 } 3514 if (!additionalMsg.isEmpty()) { 3515 additionalMsg = ". " + additionalMsg; 3516 } 3517 3518 if (eLen != aLen || i != minLen) { 3519 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3520 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3521 + " (length " + aLen + ")" + additionalMsg); 3522 } 3523 } 3524 3525 public static <T> String safeGetAsStr(List<T> lst, int i) { 3526 if (0 <= i && i < lst.size()) { 3527 return lst.get(i).toString(); 3528 } else { 3529 return "<out_of_range>"; 3530 } 3531 } 3532 3533 public String getRpcConnnectionURI() throws UnknownHostException { 3534 return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); 3535 } 3536 3537 public String getZkConnectionURI() { 3538 return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 3539 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3540 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3541 } 3542 3543 /** 3544 * Get the zk based cluster key for this cluster. 3545 * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the 3546 * connection info of a cluster. Keep here only for compatibility. 3547 * @see #getRpcConnnectionURI() 3548 * @see #getZkConnectionURI() 3549 */ 3550 @Deprecated 3551 public String getClusterKey() { 3552 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3553 + ":" 3554 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3555 } 3556 3557 /** Creates a random table with the given parameters */ 3558 public Table createRandomTable(TableName tableName, final Collection<String> families, 3559 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3560 final int numRowsPerFlush) throws IOException, InterruptedException { 3561 3562 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3563 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3564 + maxVersions + "\n"); 3565 3566 final int numCF = families.size(); 3567 final byte[][] cfBytes = new byte[numCF][]; 3568 { 3569 int cfIndex = 0; 3570 for (String cf : families) { 3571 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3572 } 3573 } 3574 3575 final int actualStartKey = 0; 3576 final int actualEndKey = Integer.MAX_VALUE; 3577 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3578 final int splitStartKey = actualStartKey + keysPerRegion; 3579 final int splitEndKey = actualEndKey - keysPerRegion; 3580 final String keyFormat = "%08x"; 3581 final Table table = createTable(tableName, cfBytes, maxVersions, 3582 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3583 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3584 3585 if (hbaseCluster != null) { 3586 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3587 } 3588 3589 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3590 3591 final Random rand = ThreadLocalRandom.current(); 3592 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3593 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3594 final byte[] row = Bytes.toBytes( 3595 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3596 3597 Put put = new Put(row); 3598 Delete del = new Delete(row); 3599 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3600 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3601 final long ts = rand.nextInt(); 3602 final byte[] qual = Bytes.toBytes("col" + iCol); 3603 if (rand.nextBoolean()) { 3604 final byte[] value = 3605 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3606 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3607 put.addColumn(cf, qual, ts, value); 3608 } else if (rand.nextDouble() < 0.8) { 3609 del.addColumn(cf, qual, ts); 3610 } else { 3611 del.addColumns(cf, qual, ts); 3612 } 3613 } 3614 3615 if (!put.isEmpty()) { 3616 mutator.mutate(put); 3617 } 3618 3619 if (!del.isEmpty()) { 3620 mutator.mutate(del); 3621 } 3622 } 3623 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3624 mutator.flush(); 3625 if (hbaseCluster != null) { 3626 getMiniHBaseCluster().flushcache(table.getName()); 3627 } 3628 } 3629 mutator.close(); 3630 3631 return table; 3632 } 3633 3634 public static int randomFreePort() { 3635 return HBaseCommonTestingUtility.randomFreePort(); 3636 } 3637 3638 public static String randomMultiCastAddress() { 3639 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3640 } 3641 3642 public static void waitForHostPort(String host, int port) throws IOException { 3643 final int maxTimeMs = 10000; 3644 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3645 IOException savedException = null; 3646 LOG.info("Waiting for server at " + host + ":" + port); 3647 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3648 try { 3649 Socket sock = new Socket(InetAddress.getByName(host), port); 3650 sock.close(); 3651 savedException = null; 3652 LOG.info("Server at " + host + ":" + port + " is available"); 3653 break; 3654 } catch (UnknownHostException e) { 3655 throw new IOException("Failed to look up " + host, e); 3656 } catch (IOException e) { 3657 savedException = e; 3658 } 3659 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3660 } 3661 3662 if (savedException != null) { 3663 throw savedException; 3664 } 3665 } 3666 3667 /** 3668 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3669 * continues. 3670 * @return the number of regions the table was split into 3671 */ 3672 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3673 byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding) 3674 throws IOException { 3675 return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression, 3676 dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT); 3677 } 3678 3679 /** 3680 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3681 * continues. 3682 * @return the number of regions the table was split into 3683 */ 3684 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3685 byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding, 3686 int numRegionsPerServer, int regionReplication, Durability durability) throws IOException { 3687 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 3688 builder.setDurability(durability); 3689 builder.setRegionReplication(regionReplication); 3690 ColumnFamilyDescriptorBuilder cfBuilder = 3691 ColumnFamilyDescriptorBuilder.newBuilder(columnFamily); 3692 cfBuilder.setDataBlockEncoding(dataBlockEncoding); 3693 cfBuilder.setCompressionType(compression); 3694 return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(), 3695 numRegionsPerServer); 3696 } 3697 3698 /** 3699 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3700 * continues. 3701 * @return the number of regions the table was split into 3702 */ 3703 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3704 byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding, 3705 int numRegionsPerServer, int regionReplication, Durability durability) throws IOException { 3706 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 3707 builder.setDurability(durability); 3708 builder.setRegionReplication(regionReplication); 3709 ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length]; 3710 for (int i = 0; i < columnFamilies.length; i++) { 3711 ColumnFamilyDescriptorBuilder cfBuilder = 3712 ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]); 3713 cfBuilder.setDataBlockEncoding(dataBlockEncoding); 3714 cfBuilder.setCompressionType(compression); 3715 hcds[i] = cfBuilder.build(); 3716 } 3717 return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer); 3718 } 3719 3720 /** 3721 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3722 * continues. 3723 * @return the number of regions the table was split into 3724 */ 3725 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3726 ColumnFamilyDescriptor hcd) throws IOException { 3727 return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER); 3728 } 3729 3730 /** 3731 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3732 * continues. 3733 * @return the number of regions the table was split into 3734 */ 3735 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3736 ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException { 3737 return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd }, 3738 numRegionsPerServer); 3739 } 3740 3741 /** 3742 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3743 * continues. 3744 * @return the number of regions the table was split into 3745 */ 3746 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3747 ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException { 3748 return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(), 3749 numRegionsPerServer); 3750 } 3751 3752 /** 3753 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3754 * continues. 3755 * @return the number of regions the table was split into 3756 */ 3757 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td, 3758 ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer) 3759 throws IOException { 3760 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td); 3761 for (ColumnFamilyDescriptor cd : cds) { 3762 if (!td.hasColumnFamily(cd.getName())) { 3763 builder.setColumnFamily(cd); 3764 } 3765 } 3766 td = builder.build(); 3767 int totalNumberOfRegions = 0; 3768 Connection unmanagedConnection = ConnectionFactory.createConnection(conf); 3769 Admin admin = unmanagedConnection.getAdmin(); 3770 3771 try { 3772 // create a table a pre-splits regions. 3773 // The number of splits is set as: 3774 // region servers * regions per region server). 3775 int numberOfServers = admin.getRegionServers().size(); 3776 if (numberOfServers == 0) { 3777 throw new IllegalStateException("No live regionservers"); 3778 } 3779 3780 totalNumberOfRegions = numberOfServers * numRegionsPerServer; 3781 LOG.info("Number of live regionservers: " + numberOfServers + ", " 3782 + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: " 3783 + numRegionsPerServer + ")"); 3784 3785 byte[][] splits = splitter.split(totalNumberOfRegions); 3786 3787 admin.createTable(td, splits); 3788 } catch (MasterNotRunningException e) { 3789 LOG.error("Master not running", e); 3790 throw new IOException(e); 3791 } catch (TableExistsException e) { 3792 LOG.warn("Table " + td.getTableName() + " already exists, continuing"); 3793 } finally { 3794 admin.close(); 3795 unmanagedConnection.close(); 3796 } 3797 return totalNumberOfRegions; 3798 } 3799 3800 public static int getMetaRSPort(Connection connection) throws IOException { 3801 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3802 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3803 } 3804 } 3805 3806 /** 3807 * Due to async racing issue, a region may not be in the online region list of a region server 3808 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3809 */ 3810 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3811 final long timeout) throws IOException, InterruptedException { 3812 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3813 while (true) { 3814 List<RegionInfo> regions = getAdmin().getRegions(server); 3815 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3816 long now = EnvironmentEdgeManager.currentTime(); 3817 if (now > timeoutTime) break; 3818 Thread.sleep(10); 3819 } 3820 throw new AssertionError( 3821 "Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3822 } 3823 3824 /** 3825 * Check to make sure the region is open on the specified region server, but not on any other one. 3826 */ 3827 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3828 final long timeout) throws IOException, InterruptedException { 3829 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3830 while (true) { 3831 List<RegionInfo> regions = getAdmin().getRegions(server); 3832 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3833 List<JVMClusterUtil.RegionServerThread> rsThreads = 3834 getHBaseCluster().getLiveRegionServerThreads(); 3835 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3836 HRegionServer rs = rsThread.getRegionServer(); 3837 if (server.equals(rs.getServerName())) { 3838 continue; 3839 } 3840 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3841 for (HRegion r : hrs) { 3842 if (r.getRegionInfo().getRegionId() == hri.getRegionId()) { 3843 throw new AssertionError("Region should not be double assigned"); 3844 } 3845 } 3846 } 3847 return; // good, we are happy 3848 } 3849 long now = EnvironmentEdgeManager.currentTime(); 3850 if (now > timeoutTime) break; 3851 Thread.sleep(10); 3852 } 3853 throw new AssertionError( 3854 "Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3855 } 3856 3857 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3858 TableDescriptor td = 3859 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3860 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3861 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3862 } 3863 3864 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3865 BlockCache blockCache) throws IOException { 3866 TableDescriptor td = 3867 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3868 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3869 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3870 } 3871 3872 public static void setFileSystemURI(String fsURI) { 3873 FS_URI = fsURI; 3874 } 3875 3876 /** 3877 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3878 */ 3879 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3880 return new ExplainingPredicate<IOException>() { 3881 @Override 3882 public String explainFailure() throws IOException { 3883 final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager(); 3884 return "found in transition: " + am.getRegionsInTransition().toString(); 3885 } 3886 3887 @Override 3888 public boolean evaluate() throws IOException { 3889 HMaster master = getMiniHBaseCluster().getMaster(); 3890 if (master == null) return false; 3891 AssignmentManager am = master.getAssignmentManager(); 3892 if (am == null) return false; 3893 return !am.hasRegionsInTransition(); 3894 } 3895 }; 3896 } 3897 3898 /** 3899 * Returns a {@link Predicate} for checking that table is enabled 3900 */ 3901 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3902 return new ExplainingPredicate<IOException>() { 3903 @Override 3904 public String explainFailure() throws IOException { 3905 return explainTableState(tableName, TableState.State.ENABLED); 3906 } 3907 3908 @Override 3909 public boolean evaluate() throws IOException { 3910 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3911 } 3912 }; 3913 } 3914 3915 /** 3916 * Returns a {@link Predicate} for checking that table is enabled 3917 */ 3918 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3919 return new ExplainingPredicate<IOException>() { 3920 @Override 3921 public String explainFailure() throws IOException { 3922 return explainTableState(tableName, TableState.State.DISABLED); 3923 } 3924 3925 @Override 3926 public boolean evaluate() throws IOException { 3927 return getAdmin().isTableDisabled(tableName); 3928 } 3929 }; 3930 } 3931 3932 /** 3933 * Returns a {@link Predicate} for checking that table is enabled 3934 */ 3935 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3936 return new ExplainingPredicate<IOException>() { 3937 @Override 3938 public String explainFailure() throws IOException { 3939 return explainTableAvailability(tableName); 3940 } 3941 3942 @Override 3943 public boolean evaluate() throws IOException { 3944 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3945 if (tableAvailable) { 3946 try (Table table = getConnection().getTable(tableName)) { 3947 TableDescriptor htd = table.getDescriptor(); 3948 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3949 .getAllRegionLocations()) { 3950 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3951 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3952 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3953 for (byte[] family : htd.getColumnFamilyNames()) { 3954 scan.addFamily(family); 3955 } 3956 try (ResultScanner scanner = table.getScanner(scan)) { 3957 scanner.next(); 3958 } 3959 } 3960 } 3961 } 3962 return tableAvailable; 3963 } 3964 }; 3965 } 3966 3967 /** 3968 * Wait until no regions in transition. 3969 * @param timeout How long to wait. 3970 */ 3971 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3972 waitFor(timeout, predicateNoRegionsInTransition()); 3973 } 3974 3975 /** 3976 * Wait until no regions in transition. (time limit 15min) 3977 */ 3978 public void waitUntilNoRegionsInTransition() throws IOException { 3979 waitUntilNoRegionsInTransition(15 * 60000); 3980 } 3981 3982 /** 3983 * Wait until labels is ready in VisibilityLabelsCache. 3984 */ 3985 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 3986 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 3987 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 3988 3989 @Override 3990 public boolean evaluate() { 3991 for (String label : labels) { 3992 if (labelsCache.getLabelOrdinal(label) == 0) { 3993 return false; 3994 } 3995 } 3996 return true; 3997 } 3998 3999 @Override 4000 public String explainFailure() { 4001 for (String label : labels) { 4002 if (labelsCache.getLabelOrdinal(label) == 0) { 4003 return label + " is not available yet"; 4004 } 4005 } 4006 return ""; 4007 } 4008 }); 4009 } 4010 4011 /** 4012 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 4013 * available. 4014 * @return the list of column descriptors 4015 */ 4016 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 4017 return generateColumnDescriptors(""); 4018 } 4019 4020 /** 4021 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 4022 * available. 4023 * @param prefix family names prefix 4024 * @return the list of column descriptors 4025 */ 4026 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 4027 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 4028 long familyId = 0; 4029 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 4030 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 4031 for (BloomType bloomType : BloomType.values()) { 4032 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 4033 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 4034 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 4035 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 4036 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 4037 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 4038 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 4039 familyId++; 4040 } 4041 } 4042 } 4043 return columnFamilyDescriptors; 4044 } 4045 4046 /** 4047 * Get supported compression algorithms. 4048 * @return supported compression algorithms. 4049 */ 4050 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 4051 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 4052 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 4053 for (String algoName : allAlgos) { 4054 try { 4055 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 4056 algo.getCompressor(); 4057 supportedAlgos.add(algo); 4058 } catch (Throwable t) { 4059 // this algo is not available 4060 } 4061 } 4062 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 4063 } 4064 4065 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 4066 Scan scan = new Scan().withStartRow(row); 4067 scan.setReadType(ReadType.PREAD); 4068 scan.setCaching(1); 4069 scan.setReversed(true); 4070 scan.addFamily(family); 4071 try (RegionScanner scanner = r.getScanner(scan)) { 4072 List<Cell> cells = new ArrayList<>(1); 4073 scanner.next(cells); 4074 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 4075 return null; 4076 } 4077 return Result.create(cells); 4078 } 4079 } 4080 4081 private boolean isTargetTable(final byte[] inRow, Cell c) { 4082 String inputRowString = Bytes.toString(inRow); 4083 int i = inputRowString.indexOf(HConstants.DELIMITER); 4084 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 4085 int o = outputRowString.indexOf(HConstants.DELIMITER); 4086 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 4087 } 4088 4089 /** 4090 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 4091 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 4092 * kerby KDC server and utility for using it, 4093 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 4094 * less baggage. It came in in HBASE-5291. 4095 */ 4096 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 4097 Properties conf = MiniKdc.createConf(); 4098 conf.put(MiniKdc.DEBUG, true); 4099 MiniKdc kdc = null; 4100 File dir = null; 4101 // There is time lag between selecting a port and trying to bind with it. It's possible that 4102 // another service captures the port in between which'll result in BindException. 4103 boolean bindException; 4104 int numTries = 0; 4105 do { 4106 try { 4107 bindException = false; 4108 dir = new File(getDataTestDir("kdc").toUri().getPath()); 4109 kdc = new MiniKdc(conf, dir); 4110 kdc.start(); 4111 } catch (BindException e) { 4112 FileUtils.deleteDirectory(dir); // clean directory 4113 numTries++; 4114 if (numTries == 3) { 4115 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 4116 throw e; 4117 } 4118 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 4119 bindException = true; 4120 } 4121 } while (bindException); 4122 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 4123 return kdc; 4124 } 4125 4126 public int getNumHFiles(final TableName tableName, final byte[] family) { 4127 int numHFiles = 0; 4128 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 4129 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 4130 } 4131 return numHFiles; 4132 } 4133 4134 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 4135 final byte[] family) { 4136 int numHFiles = 0; 4137 for (Region region : rs.getRegions(tableName)) { 4138 numHFiles += region.getStore(family).getStorefilesCount(); 4139 } 4140 return numHFiles; 4141 } 4142 4143 private void assertEquals(String message, int expected, int actual) { 4144 if (expected == actual) { 4145 return; 4146 } 4147 String formatted = ""; 4148 if (message != null && !"".equals(message)) { 4149 formatted = message + " "; 4150 } 4151 throw new AssertionError(formatted + "expected:<" + expected + "> but was:<" + actual + ">"); 4152 } 4153 4154 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 4155 if (ltd.getValues().hashCode() != rtd.getValues().hashCode()) { 4156 throw new AssertionError(); 4157 } 4158 assertEquals("", ltd.getValues().hashCode(), rtd.getValues().hashCode()); 4159 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 4160 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 4161 assertEquals("", ltdFamilies.size(), rtdFamilies.size()); 4162 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 4163 it2 = rtdFamilies.iterator(); it.hasNext();) { 4164 assertEquals("", 0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 4165 } 4166 } 4167 4168 /** 4169 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 4170 * invocations. 4171 */ 4172 public static void await(final long sleepMillis, final BooleanSupplier condition) 4173 throws InterruptedException { 4174 try { 4175 while (!condition.getAsBoolean()) { 4176 Thread.sleep(sleepMillis); 4177 } 4178 } catch (RuntimeException e) { 4179 if (e.getCause() instanceof AssertionError) { 4180 throw (AssertionError) e.getCause(); 4181 } 4182 throw e; 4183 } 4184 } 4185}