001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.File; 021import java.io.IOException; 022import java.io.OutputStream; 023import java.io.UncheckedIOException; 024import java.lang.reflect.Field; 025import java.lang.reflect.Modifier; 026import java.net.BindException; 027import java.net.DatagramSocket; 028import java.net.InetAddress; 029import java.net.ServerSocket; 030import java.net.Socket; 031import java.net.UnknownHostException; 032import java.nio.charset.StandardCharsets; 033import java.security.MessageDigest; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.HashSet; 039import java.util.Iterator; 040import java.util.List; 041import java.util.Map; 042import java.util.NavigableSet; 043import java.util.Properties; 044import java.util.Random; 045import java.util.Set; 046import java.util.TreeSet; 047import java.util.concurrent.ThreadLocalRandom; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicReference; 050import java.util.function.BooleanSupplier; 051import org.apache.commons.io.FileUtils; 052import org.apache.commons.lang3.RandomStringUtils; 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.fs.FileSystem; 055import org.apache.hadoop.fs.Path; 056import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 057import org.apache.hadoop.hbase.Waiter.Predicate; 058import org.apache.hadoop.hbase.client.Admin; 059import org.apache.hadoop.hbase.client.AsyncClusterConnection; 060import org.apache.hadoop.hbase.client.BufferedMutator; 061import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 064import org.apache.hadoop.hbase.client.Connection; 065import org.apache.hadoop.hbase.client.ConnectionFactory; 066import org.apache.hadoop.hbase.client.Consistency; 067import org.apache.hadoop.hbase.client.Delete; 068import org.apache.hadoop.hbase.client.Durability; 069import org.apache.hadoop.hbase.client.Get; 070import org.apache.hadoop.hbase.client.Hbck; 071import org.apache.hadoop.hbase.client.MasterRegistry; 072import org.apache.hadoop.hbase.client.Put; 073import org.apache.hadoop.hbase.client.RegionInfo; 074import org.apache.hadoop.hbase.client.RegionInfoBuilder; 075import org.apache.hadoop.hbase.client.RegionLocator; 076import org.apache.hadoop.hbase.client.Result; 077import org.apache.hadoop.hbase.client.ResultScanner; 078import org.apache.hadoop.hbase.client.Scan; 079import org.apache.hadoop.hbase.client.Scan.ReadType; 080import org.apache.hadoop.hbase.client.Table; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 083import org.apache.hadoop.hbase.client.TableState; 084import org.apache.hadoop.hbase.fs.HFileSystem; 085import org.apache.hadoop.hbase.io.compress.Compression; 086import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 087import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 088import org.apache.hadoop.hbase.io.hfile.BlockCache; 089import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 090import org.apache.hadoop.hbase.io.hfile.HFile; 091import org.apache.hadoop.hbase.ipc.RpcServerInterface; 092import org.apache.hadoop.hbase.keymeta.KeymetaAdminClient; 093import org.apache.hadoop.hbase.logging.Log4jUtils; 094import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 095import org.apache.hadoop.hbase.master.HMaster; 096import org.apache.hadoop.hbase.master.RegionState; 097import org.apache.hadoop.hbase.master.ServerManager; 098import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 099import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 100import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 101import org.apache.hadoop.hbase.master.assignment.RegionStates; 102import org.apache.hadoop.hbase.mob.MobFileCache; 103import org.apache.hadoop.hbase.regionserver.BloomType; 104import org.apache.hadoop.hbase.regionserver.ChunkCreator; 105import org.apache.hadoop.hbase.regionserver.HRegion; 106import org.apache.hadoop.hbase.regionserver.HRegionServer; 107import org.apache.hadoop.hbase.regionserver.HStore; 108import org.apache.hadoop.hbase.regionserver.InternalScanner; 109import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 110import org.apache.hadoop.hbase.regionserver.Region; 111import org.apache.hadoop.hbase.regionserver.RegionScanner; 112import org.apache.hadoop.hbase.regionserver.RegionServerServices; 113import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 114import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 115import org.apache.hadoop.hbase.security.User; 116import org.apache.hadoop.hbase.security.UserProvider; 117import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 118import org.apache.hadoop.hbase.util.Bytes; 119import org.apache.hadoop.hbase.util.CommonFSUtils; 120import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 121import org.apache.hadoop.hbase.util.FSUtils; 122import org.apache.hadoop.hbase.util.JVMClusterUtil; 123import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 124import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 125import org.apache.hadoop.hbase.util.Pair; 126import org.apache.hadoop.hbase.util.ReflectionUtils; 127import org.apache.hadoop.hbase.util.RegionSplitter; 128import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm; 129import org.apache.hadoop.hbase.util.RetryCounter; 130import org.apache.hadoop.hbase.util.Threads; 131import org.apache.hadoop.hbase.wal.WAL; 132import org.apache.hadoop.hbase.wal.WALFactory; 133import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 134import org.apache.hadoop.hbase.zookeeper.ZKConfig; 135import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 136import org.apache.hadoop.hdfs.DFSClient; 137import org.apache.hadoop.hdfs.DFSConfigKeys; 138import org.apache.hadoop.hdfs.DistributedFileSystem; 139import org.apache.hadoop.hdfs.MiniDFSCluster; 140import org.apache.hadoop.hdfs.server.datanode.DataNode; 141import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 142import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 143import org.apache.hadoop.mapred.JobConf; 144import org.apache.hadoop.mapred.MiniMRCluster; 145import org.apache.hadoop.mapred.TaskLog; 146import org.apache.hadoop.metrics2.impl.JmxCacheBuster; 147import org.apache.hadoop.minikdc.MiniKdc; 148import org.apache.yetus.audience.InterfaceAudience; 149import org.apache.zookeeper.WatchedEvent; 150import org.apache.zookeeper.ZooKeeper; 151import org.apache.zookeeper.ZooKeeper.States; 152 153import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 154 155import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 156 157/** 158 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 159 * functionality. Create an instance and keep it around testing HBase. This class is meant to be 160 * your one-stop shop for anything you might need testing. Manages one cluster at a time only. 161 * Managed cluster can be an in-process {@link MiniHBaseCluster}, or a deployed cluster of type 162 * {@code DistributedHBaseCluster}. Not all methods work with the real cluster. Depends on log4j 163 * being on classpath and hbase-site.xml for logging and test-run configuration. It does not set 164 * logging levels. In the configuration properties, default values for master-info-port and 165 * region-server-port are overridden such that a random port will be assigned (thus avoiding port 166 * contention if another local HBase instance is already running). 167 * <p> 168 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 169 * setting it to true. 170 * @deprecated since 3.0.0, will be removed in 4.0.0. Use 171 * {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead. 172 */ 173@InterfaceAudience.Public 174@Deprecated 175public class HBaseTestingUtility extends HBaseZKTestingUtility { 176 177 public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server"; 178 /** 179 * The default number of regions per regionserver when creating a pre-split table. 180 */ 181 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 182 183 private MiniDFSCluster dfsCluster = null; 184 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 185 186 private volatile HBaseCluster hbaseCluster = null; 187 private MiniMRCluster mrCluster = null; 188 189 /** If there is a mini cluster running for this testing utility instance. */ 190 private volatile boolean miniClusterRunning; 191 192 private String hadoopLogDir; 193 194 /** 195 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 196 */ 197 private Path dataTestDirOnTestFS = null; 198 199 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 200 201 /** Filesystem URI used for map-reduce mini-cluster setup */ 202 private static String FS_URI; 203 204 /** This is for unit tests parameterized with a single boolean. */ 205 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 206 207 private Admin hbaseAdmin = null; 208 209 static { 210 // JmxCacheBuster may cause dead lock in test environment. As on master side, the table/region 211 // related metrics updating will finally lead to a meta access, so if meta is not online yet, we 212 // will block when updating while holding the metrics lock. But when we assign meta, there are 213 // bunch of places where we need to register a new metrics thus need to get the metrics lock, 214 // and then lead to a dead lock and cause the test to hang forever. 215 // The code is in hadoop so there is no easy way for us to fix, so here we just stop 216 // JmxCacheBuster to stabilize our tests first. See HBASE-30118 for more details and future 217 // plans. 218 JmxCacheBuster.stop(); 219 } 220 221 /** 222 * Checks to see if a specific port is available. 223 * @param port the port number to check for availability 224 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 225 */ 226 public static boolean available(int port) { 227 ServerSocket ss = null; 228 DatagramSocket ds = null; 229 try { 230 ss = new ServerSocket(port); 231 ss.setReuseAddress(true); 232 ds = new DatagramSocket(port); 233 ds.setReuseAddress(true); 234 return true; 235 } catch (IOException e) { 236 // Do nothing 237 } finally { 238 if (ds != null) { 239 ds.close(); 240 } 241 242 if (ss != null) { 243 try { 244 ss.close(); 245 } catch (IOException e) { 246 /* should not be thrown */ 247 } 248 } 249 } 250 251 return false; 252 } 253 254 /** 255 * Create all combinations of Bloom filters and compression algorithms for testing. 256 */ 257 private static List<Object[]> bloomAndCompressionCombinations() { 258 List<Object[]> configurations = new ArrayList<>(); 259 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) { 260 for (BloomType bloomType : BloomType.values()) { 261 configurations.add(new Object[] { comprAlgo, bloomType }); 262 } 263 } 264 return Collections.unmodifiableList(configurations); 265 } 266 267 /** 268 * Create combination of memstoreTS and tags 269 */ 270 private static List<Object[]> memStoreTSAndTagsCombination() { 271 List<Object[]> configurations = new ArrayList<>(); 272 configurations.add(new Object[] { false, false }); 273 configurations.add(new Object[] { false, true }); 274 configurations.add(new Object[] { true, false }); 275 configurations.add(new Object[] { true, true }); 276 return Collections.unmodifiableList(configurations); 277 } 278 279 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 280 List<Object[]> configurations = new ArrayList<>(); 281 configurations.add(new Object[] { false, false, true }); 282 configurations.add(new Object[] { false, false, false }); 283 configurations.add(new Object[] { false, true, true }); 284 configurations.add(new Object[] { false, true, false }); 285 configurations.add(new Object[] { true, false, true }); 286 configurations.add(new Object[] { true, false, false }); 287 configurations.add(new Object[] { true, true, true }); 288 configurations.add(new Object[] { true, true, false }); 289 return Collections.unmodifiableList(configurations); 290 } 291 292 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 293 bloomAndCompressionCombinations(); 294 295 /** 296 * <p> 297 * Create an HBaseTestingUtility using a default configuration. 298 * <p> 299 * Initially, all tmp files are written to a local test data directory. Once 300 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 301 * data will be written to the DFS directory instead. 302 */ 303 public HBaseTestingUtility() { 304 this(HBaseConfiguration.create()); 305 } 306 307 /** 308 * <p> 309 * Create an HBaseTestingUtility using a given configuration. 310 * <p> 311 * Initially, all tmp files are written to a local test data directory. Once 312 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 313 * data will be written to the DFS directory instead. 314 * @param conf The configuration to use for further operations 315 */ 316 public HBaseTestingUtility(Configuration conf) { 317 super(conf); 318 319 // a hbase checksum verification failure will cause unit tests to fail 320 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 321 322 // Save this for when setting default file:// breaks things 323 if (this.conf.get("fs.defaultFS") != null) { 324 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 325 } 326 if (this.conf.get(HConstants.HBASE_DIR) != null) { 327 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 328 } 329 // Every cluster is a local cluster until we start DFS 330 // Note that conf could be null, but this.conf will not be 331 String dataTestDir = getDataTestDir().toString(); 332 this.conf.set("fs.defaultFS", "file:///"); 333 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 334 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 335 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 336 // If the value for random ports isn't set set it to true, thus making 337 // tests opt-out for random port assignment 338 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 339 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 340 } 341 342 /** 343 * Close both the region {@code r} and it's underlying WAL. For use in tests. 344 */ 345 public static void closeRegionAndWAL(final Region r) throws IOException { 346 closeRegionAndWAL((HRegion) r); 347 } 348 349 /** 350 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 351 */ 352 public static void closeRegionAndWAL(final HRegion r) throws IOException { 353 if (r == null) return; 354 r.close(); 355 if (r.getWAL() == null) return; 356 r.getWAL().close(); 357 } 358 359 /** 360 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 361 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 362 * by the Configuration. If say, a Connection was being used against a cluster that had been 363 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 364 * Rather than use the return direct, its usually best to make a copy and use that. Do 365 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 366 * @return Instance of Configuration. 367 */ 368 @Override 369 public Configuration getConfiguration() { 370 return super.getConfiguration(); 371 } 372 373 public void setHBaseCluster(HBaseCluster hbaseCluster) { 374 this.hbaseCluster = hbaseCluster; 375 } 376 377 /** 378 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 379 * have many concurrent tests running if we need to. Moding a System property is not the way to do 380 * concurrent instances -- another instance could grab the temporary value unintentionally -- but 381 * not anything can do about it at moment; single instance only is how the minidfscluster works. 382 * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir 383 * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir 384 * (We do not create them!). 385 * @return The calculated data test build directory, if newly-created. 386 */ 387 @Override 388 protected Path setupDataTestDir() { 389 Path testPath = super.setupDataTestDir(); 390 if (null == testPath) { 391 return null; 392 } 393 394 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 395 396 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 397 // we want our own value to ensure uniqueness on the same machine 398 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 399 400 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 401 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 402 return testPath; 403 } 404 405 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 406 407 String sysValue = System.getProperty(propertyName); 408 409 if (sysValue != null) { 410 // There is already a value set. So we do nothing but hope 411 // that there will be no conflicts 412 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 413 + " so I do NOT create it in " + parent); 414 String confValue = conf.get(propertyName); 415 if (confValue != null && !confValue.endsWith(sysValue)) { 416 LOG.warn(propertyName + " property value differs in configuration and system: " 417 + "Configuration=" + confValue + " while System=" + sysValue 418 + " Erasing configuration value by system value."); 419 } 420 conf.set(propertyName, sysValue); 421 } else { 422 // Ok, it's not set, so we create it as a subdirectory 423 createSubDir(propertyName, parent, subDirName); 424 System.setProperty(propertyName, conf.get(propertyName)); 425 } 426 } 427 428 /** 429 * @return Where to write test data on the test filesystem; Returns working directory for the test 430 * filesystem by default 431 * @see #setupDataTestDirOnTestFS() 432 * @see #getTestFileSystem() 433 */ 434 private Path getBaseTestDirOnTestFS() throws IOException { 435 FileSystem fs = getTestFileSystem(); 436 return new Path(fs.getWorkingDirectory(), "test-data"); 437 } 438 439 /** 440 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 441 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 442 * on it. 443 * @return a unique path in the test filesystem 444 */ 445 public Path getDataTestDirOnTestFS() throws IOException { 446 if (dataTestDirOnTestFS == null) { 447 setupDataTestDirOnTestFS(); 448 } 449 450 return dataTestDirOnTestFS; 451 } 452 453 /** 454 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 455 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 456 * on it. 457 * @return a unique path in the test filesystem 458 * @param subdirName name of the subdir to create under the base test dir 459 */ 460 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 461 return new Path(getDataTestDirOnTestFS(), subdirName); 462 } 463 464 /** 465 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 466 * setup. 467 */ 468 private void setupDataTestDirOnTestFS() throws IOException { 469 if (dataTestDirOnTestFS != null) { 470 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 471 return; 472 } 473 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 474 } 475 476 /** 477 * Sets up a new path in test filesystem to be used by tests. 478 */ 479 private Path getNewDataTestDirOnTestFS() throws IOException { 480 // The file system can be either local, mini dfs, or if the configuration 481 // is supplied externally, it can be an external cluster FS. If it is a local 482 // file system, the tests should use getBaseTestDir, otherwise, we can use 483 // the working directory, and create a unique sub dir there 484 FileSystem fs = getTestFileSystem(); 485 Path newDataTestDir; 486 String randomStr = getRandomUUID().toString(); 487 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 488 newDataTestDir = new Path(getDataTestDir(), randomStr); 489 File dataTestDir = new File(newDataTestDir.toString()); 490 if (deleteOnExit()) dataTestDir.deleteOnExit(); 491 } else { 492 Path base = getBaseTestDirOnTestFS(); 493 newDataTestDir = new Path(base, randomStr); 494 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 495 } 496 return newDataTestDir; 497 } 498 499 /** 500 * Cleans the test data directory on the test filesystem. 501 * @return True if we removed the test dirs 502 */ 503 public boolean cleanupDataTestDirOnTestFS() throws IOException { 504 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 505 if (ret) dataTestDirOnTestFS = null; 506 return ret; 507 } 508 509 /** 510 * Cleans a subdirectory under the test data directory on the test filesystem. 511 * @return True if we removed child 512 */ 513 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 514 Path cpath = getDataTestDirOnTestFS(subdirName); 515 return getTestFileSystem().delete(cpath, true); 516 } 517 518 // Workaround to avoid IllegalThreadStateException 519 // See HBASE-27148 for more details 520 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 521 522 private volatile boolean stopped = false; 523 524 private final MiniDFSCluster cluster; 525 526 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 527 super("FsDatasetAsyncDiskServiceFixer"); 528 setDaemon(true); 529 this.cluster = cluster; 530 } 531 532 @Override 533 public void run() { 534 while (!stopped) { 535 try { 536 Thread.sleep(30000); 537 } catch (InterruptedException e) { 538 Thread.currentThread().interrupt(); 539 continue; 540 } 541 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 542 // timeout of the thread pool executor is 60 seconds by default. 543 try { 544 for (DataNode dn : cluster.getDataNodes()) { 545 FsDatasetSpi<?> dataset = dn.getFSDataset(); 546 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 547 service.setAccessible(true); 548 Object asyncDiskService = service.get(dataset); 549 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 550 group.setAccessible(true); 551 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 552 if (threadGroup.isDaemon()) { 553 threadGroup.setDaemon(false); 554 } 555 } 556 } catch (NoSuchFieldException e) { 557 LOG.debug("NoSuchFieldException: " + e.getMessage() 558 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 559 + "See HBASE-27595 for details."); 560 } catch (Exception e) { 561 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 562 } 563 } 564 } 565 566 void shutdown() { 567 stopped = true; 568 interrupt(); 569 } 570 } 571 572 /** 573 * Start a minidfscluster. 574 * @param servers How many DNs to start. 575 * @see #shutdownMiniDFSCluster() 576 * @return The mini dfs cluster created. 577 */ 578 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 579 return startMiniDFSCluster(servers, null); 580 } 581 582 /** 583 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 584 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 585 * instances of the datanodes will have the same host name. 586 * @param hosts hostnames DNs to run on. 587 * @see #shutdownMiniDFSCluster() 588 * @return The mini dfs cluster created. 589 */ 590 public MiniDFSCluster startMiniDFSCluster(final String hosts[]) throws Exception { 591 if (hosts != null && hosts.length != 0) { 592 return startMiniDFSCluster(hosts.length, hosts); 593 } else { 594 return startMiniDFSCluster(1, null); 595 } 596 } 597 598 /** 599 * Start a minidfscluster. Can only create one. 600 * @param servers How many DNs to start. 601 * @param hosts hostnames DNs to run on. 602 * @see #shutdownMiniDFSCluster() 603 * @return The mini dfs cluster created. 604 */ 605 public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[]) throws Exception { 606 return startMiniDFSCluster(servers, null, hosts); 607 } 608 609 private void setFs() throws IOException { 610 if (this.dfsCluster == null) { 611 LOG.info("Skipping setting fs because dfsCluster is null"); 612 return; 613 } 614 FileSystem fs = this.dfsCluster.getFileSystem(); 615 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 616 617 // re-enable this check with dfs 618 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 619 } 620 621 public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[]) 622 throws Exception { 623 createDirsAndSetProperties(); 624 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 625 626 this.dfsCluster = 627 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 628 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 629 this.dfsClusterFixer.start(); 630 // Set this just-started cluster as our filesystem. 631 setFs(); 632 633 // Wait for the cluster to be totally up 634 this.dfsCluster.waitClusterUp(); 635 636 // reset the test directory for test file system 637 dataTestDirOnTestFS = null; 638 String dataTestDir = getDataTestDir().toString(); 639 conf.set(HConstants.HBASE_DIR, dataTestDir); 640 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 641 642 return this.dfsCluster; 643 } 644 645 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 646 createDirsAndSetProperties(); 647 dfsCluster = 648 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 649 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 650 this.dfsClusterFixer.start(); 651 return dfsCluster; 652 } 653 654 /** 655 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 656 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 657 * the conf. 658 * 659 * <pre> 660 * Configuration conf = TEST_UTIL.getConfiguration(); 661 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 662 * Map.Entry<String, String> e = i.next(); 663 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 664 * } 665 * </pre> 666 */ 667 private void createDirsAndSetProperties() throws IOException { 668 setupClusterTestDir(); 669 conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath()); 670 createDirAndSetProperty("test.cache.data"); 671 createDirAndSetProperty("hadoop.tmp.dir"); 672 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 673 createDirAndSetProperty("mapreduce.cluster.local.dir"); 674 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 675 enableShortCircuit(); 676 677 Path root = getDataTestDirOnTestFS("hadoop"); 678 conf.set(MapreduceTestingShim.getMROutputDirProp(), 679 new Path(root, "mapred-output-dir").toString()); 680 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 681 conf.set("mapreduce.jobtracker.staging.root.dir", 682 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 683 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 684 conf.set("yarn.app.mapreduce.am.staging-dir", 685 new Path(root, "mapreduce-am-staging-root-dir").toString()); 686 687 // Frustrate yarn's and hdfs's attempts at writing /tmp. 688 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 689 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 690 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 691 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 692 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 693 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 694 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 695 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 696 createDirAndSetProperty("dfs.journalnode.edits.dir"); 697 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 698 createDirAndSetProperty("nfs.dump.dir"); 699 createDirAndSetProperty("java.io.tmpdir"); 700 createDirAndSetProperty("dfs.journalnode.edits.dir"); 701 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 702 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 703 704 // disable metrics logger since it depend on commons-logging internal classes and we do not want 705 // commons-logging on our classpath 706 conf.setInt(DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0); 707 conf.setInt(DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, 0); 708 } 709 710 /** 711 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 712 * Default to false. 713 */ 714 public boolean isNewVersionBehaviorEnabled() { 715 final String propName = "hbase.tests.new.version.behavior"; 716 String v = System.getProperty(propName); 717 if (v != null) { 718 return Boolean.parseBoolean(v); 719 } 720 return false; 721 } 722 723 /** 724 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 725 * allows to specify this parameter on the command line. If not set, default is true. 726 */ 727 public boolean isReadShortCircuitOn() { 728 final String propName = "hbase.tests.use.shortcircuit.reads"; 729 String readOnProp = System.getProperty(propName); 730 if (readOnProp != null) { 731 return Boolean.parseBoolean(readOnProp); 732 } else { 733 return conf.getBoolean(propName, false); 734 } 735 } 736 737 /** 738 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 739 * including skipping the hdfs checksum checks. 740 */ 741 private void enableShortCircuit() { 742 if (isReadShortCircuitOn()) { 743 String curUser = System.getProperty("user.name"); 744 LOG.info("read short circuit is ON for user " + curUser); 745 // read short circuit, for hdfs 746 conf.set("dfs.block.local-path-access.user", curUser); 747 // read short circuit, for hbase 748 conf.setBoolean("dfs.client.read.shortcircuit", true); 749 // Skip checking checksum, for the hdfs client and the datanode 750 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 751 } else { 752 LOG.info("read short circuit is OFF"); 753 } 754 } 755 756 private String createDirAndSetProperty(final String property) { 757 return createDirAndSetProperty(property, property); 758 } 759 760 private String createDirAndSetProperty(final String relPath, String property) { 761 String path = getDataTestDir(relPath).toString(); 762 System.setProperty(property, path); 763 conf.set(property, path); 764 new File(path).mkdirs(); 765 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 766 return path; 767 } 768 769 /** 770 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 771 */ 772 public void shutdownMiniDFSCluster() throws IOException { 773 if (this.dfsCluster != null) { 774 // The below throws an exception per dn, AsynchronousCloseException. 775 this.dfsCluster.shutdown(); 776 dfsCluster = null; 777 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 778 // have a fixer 779 if (dfsClusterFixer != null) { 780 this.dfsClusterFixer.shutdown(); 781 dfsClusterFixer = null; 782 } 783 dataTestDirOnTestFS = null; 784 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 785 } 786 } 787 788 /** 789 * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately. 790 * All other options will use default values, defined in {@link StartMiniClusterOption.Builder}. 791 * @param createWALDir Whether to create a new WAL directory. 792 * @return The mini HBase cluster created. 793 * @see #shutdownMiniCluster() 794 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 795 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 796 * @see #startMiniCluster(StartMiniClusterOption) 797 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 798 */ 799 @Deprecated 800 public MiniHBaseCluster startMiniCluster(boolean createWALDir) throws Exception { 801 StartMiniClusterOption option = 802 StartMiniClusterOption.builder().createWALDir(createWALDir).build(); 803 return startMiniCluster(option); 804 } 805 806 /** 807 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 808 * defined in {@link StartMiniClusterOption.Builder}. 809 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 810 * @param createRootDir Whether to create a new root or data directory path. 811 * @return The mini HBase cluster created. 812 * @see #shutdownMiniCluster() 813 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 814 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 815 * @see #startMiniCluster(StartMiniClusterOption) 816 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 817 */ 818 @Deprecated 819 public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir) throws Exception { 820 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves) 821 .numDataNodes(numSlaves).createRootDir(createRootDir).build(); 822 return startMiniCluster(option); 823 } 824 825 /** 826 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 827 * defined in {@link StartMiniClusterOption.Builder}. 828 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 829 * @param createRootDir Whether to create a new root or data directory path. 830 * @param createWALDir Whether to create a new WAL directory. 831 * @return The mini HBase cluster created. 832 * @see #shutdownMiniCluster() 833 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 834 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 835 * @see #startMiniCluster(StartMiniClusterOption) 836 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 837 */ 838 @Deprecated 839 public MiniHBaseCluster startMiniCluster(int numSlaves, boolean createRootDir, 840 boolean createWALDir) throws Exception { 841 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(numSlaves) 842 .numDataNodes(numSlaves).createRootDir(createRootDir).createWALDir(createWALDir).build(); 843 return startMiniCluster(option); 844 } 845 846 /** 847 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 848 * defined in {@link StartMiniClusterOption.Builder}. 849 * @param numMasters Master node number. 850 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 851 * @param createRootDir Whether to create a new root or data directory path. 852 * @return The mini HBase cluster created. 853 * @see #shutdownMiniCluster() 854 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 855 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 856 * @see #startMiniCluster(StartMiniClusterOption) 857 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 858 */ 859 @Deprecated 860 public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, boolean createRootDir) 861 throws Exception { 862 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 863 .numRegionServers(numSlaves).createRootDir(createRootDir).numDataNodes(numSlaves).build(); 864 return startMiniCluster(option); 865 } 866 867 /** 868 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 869 * defined in {@link StartMiniClusterOption.Builder}. 870 * @param numMasters Master node number. 871 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 872 * @return The mini HBase cluster created. 873 * @see #shutdownMiniCluster() 874 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 875 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 876 * @see #startMiniCluster(StartMiniClusterOption) 877 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 878 */ 879 @Deprecated 880 public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves) throws Exception { 881 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 882 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 883 return startMiniCluster(option); 884 } 885 886 /** 887 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 888 * defined in {@link StartMiniClusterOption.Builder}. 889 * @param numMasters Master node number. 890 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 891 * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite 892 * HDFS data node number. 893 * @param createRootDir Whether to create a new root or data directory path. 894 * @return The mini HBase cluster created. 895 * @see #shutdownMiniCluster() 896 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 897 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 898 * @see #startMiniCluster(StartMiniClusterOption) 899 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 900 */ 901 @Deprecated 902 public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts, 903 boolean createRootDir) throws Exception { 904 StartMiniClusterOption option = 905 StartMiniClusterOption.builder().numMasters(numMasters).numRegionServers(numSlaves) 906 .createRootDir(createRootDir).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build(); 907 return startMiniCluster(option); 908 } 909 910 /** 911 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 912 * defined in {@link StartMiniClusterOption.Builder}. 913 * @param numMasters Master node number. 914 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 915 * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite 916 * HDFS data node number. 917 * @return The mini HBase cluster created. 918 * @see #shutdownMiniCluster() 919 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 920 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 921 * @see #startMiniCluster(StartMiniClusterOption) 922 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 923 */ 924 @Deprecated 925 public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts) 926 throws Exception { 927 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 928 .numRegionServers(numSlaves).numDataNodes(numSlaves).dataNodeHosts(dataNodeHosts).build(); 929 return startMiniCluster(option); 930 } 931 932 /** 933 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 934 * defined in {@link StartMiniClusterOption.Builder}. 935 * @param numMasters Master node number. 936 * @param numRegionServers Number of region servers. 937 * @param numDataNodes Number of datanodes. 938 * @return The mini HBase cluster created. 939 * @see #shutdownMiniCluster() 940 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 941 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 942 * @see #startMiniCluster(StartMiniClusterOption) 943 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 944 */ 945 @Deprecated 946 public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes) 947 throws Exception { 948 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 949 .numRegionServers(numRegionServers).numDataNodes(numDataNodes).build(); 950 return startMiniCluster(option); 951 } 952 953 /** 954 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 955 * defined in {@link StartMiniClusterOption.Builder}. 956 * @param numMasters Master node number. 957 * @param numSlaves Slave node number, for both HBase region server and HDFS data node. 958 * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will overwrite 959 * HDFS data node number. 960 * @param masterClass The class to use as HMaster, or null for default. 961 * @param rsClass The class to use as HRegionServer, or null for default. 962 * @return The mini HBase cluster created. 963 * @see #shutdownMiniCluster() 964 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 965 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 966 * @see #startMiniCluster(StartMiniClusterOption) 967 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 968 */ 969 @Deprecated 970 public MiniHBaseCluster startMiniCluster(int numMasters, int numSlaves, String[] dataNodeHosts, 971 Class<? extends HMaster> masterClass, 972 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception { 973 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 974 .masterClass(masterClass).numRegionServers(numSlaves).rsClass(rsClass).numDataNodes(numSlaves) 975 .dataNodeHosts(dataNodeHosts).build(); 976 return startMiniCluster(option); 977 } 978 979 /** 980 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 981 * defined in {@link StartMiniClusterOption.Builder}. 982 * @param numMasters Master node number. 983 * @param numRegionServers Number of region servers. 984 * @param numDataNodes Number of datanodes. 985 * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will 986 * overwrite HDFS data node number. 987 * @param masterClass The class to use as HMaster, or null for default. 988 * @param rsClass The class to use as HRegionServer, or null for default. 989 * @return The mini HBase cluster created. 990 * @see #shutdownMiniCluster() 991 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 992 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 993 * @see #startMiniCluster(StartMiniClusterOption) 994 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 995 */ 996 @Deprecated 997 public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes, 998 String[] dataNodeHosts, Class<? extends HMaster> masterClass, 999 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass) throws Exception { 1000 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 1001 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass) 1002 .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).build(); 1003 return startMiniCluster(option); 1004 } 1005 1006 /** 1007 * Start up a minicluster of hbase, dfs, and zookeeper. All other options will use default values, 1008 * defined in {@link StartMiniClusterOption.Builder}. 1009 * @param numMasters Master node number. 1010 * @param numRegionServers Number of region servers. 1011 * @param numDataNodes Number of datanodes. 1012 * @param dataNodeHosts The hostnames of DataNodes to run on. If not null, its size will 1013 * overwrite HDFS data node number. 1014 * @param masterClass The class to use as HMaster, or null for default. 1015 * @param rsClass The class to use as HRegionServer, or null for default. 1016 * @param createRootDir Whether to create a new root or data directory path. 1017 * @param createWALDir Whether to create a new WAL directory. 1018 * @return The mini HBase cluster created. 1019 * @see #shutdownMiniCluster() 1020 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 1021 * {@link #startMiniCluster(StartMiniClusterOption)} instead. 1022 * @see #startMiniCluster(StartMiniClusterOption) 1023 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 1024 */ 1025 @Deprecated 1026 public MiniHBaseCluster startMiniCluster(int numMasters, int numRegionServers, int numDataNodes, 1027 String[] dataNodeHosts, Class<? extends HMaster> masterClass, 1028 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir, 1029 boolean createWALDir) throws Exception { 1030 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 1031 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass) 1032 .numDataNodes(numDataNodes).dataNodeHosts(dataNodeHosts).createRootDir(createRootDir) 1033 .createWALDir(createWALDir).build(); 1034 return startMiniCluster(option); 1035 } 1036 1037 /** 1038 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 1039 * other options will use default values, defined in {@link StartMiniClusterOption.Builder}. 1040 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 1041 * @see #startMiniCluster(StartMiniClusterOption option) 1042 * @see #shutdownMiniDFSCluster() 1043 */ 1044 public MiniHBaseCluster startMiniCluster(int numSlaves) throws Exception { 1045 StartMiniClusterOption option = 1046 StartMiniClusterOption.builder().numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 1047 return startMiniCluster(option); 1048 } 1049 1050 /** 1051 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 1052 * value can be found in {@link StartMiniClusterOption.Builder}. 1053 * @see #startMiniCluster(StartMiniClusterOption option) 1054 * @see #shutdownMiniDFSCluster() 1055 */ 1056 public MiniHBaseCluster startMiniCluster() throws Exception { 1057 return startMiniCluster(StartMiniClusterOption.builder().build()); 1058 } 1059 1060 /** 1061 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 1062 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 1063 * under System property test.build.data, to be cleaned up on exit. 1064 * @see #shutdownMiniDFSCluster() 1065 */ 1066 public MiniHBaseCluster startMiniCluster(StartMiniClusterOption option) throws Exception { 1067 LOG.info("Starting up minicluster with option: {}", option); 1068 1069 // If we already put up a cluster, fail. 1070 if (miniClusterRunning) { 1071 throw new IllegalStateException("A mini-cluster is already running"); 1072 } 1073 miniClusterRunning = true; 1074 1075 setupClusterTestDir(); 1076 1077 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 1078 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 1079 if (dfsCluster == null) { 1080 LOG.info("STARTING DFS"); 1081 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 1082 } else { 1083 LOG.info("NOT STARTING DFS"); 1084 } 1085 1086 // Start up a zk cluster. 1087 if (getZkCluster() == null) { 1088 startMiniZKCluster(option.getNumZkServers()); 1089 } 1090 1091 // Start the MiniHBaseCluster 1092 return startMiniHBaseCluster(option); 1093 } 1094 1095 /** 1096 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 1097 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 1098 * @return Reference to the hbase mini hbase cluster. 1099 * @see #startMiniCluster(StartMiniClusterOption) 1100 * @see #shutdownMiniHBaseCluster() 1101 */ 1102 public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option) 1103 throws IOException, InterruptedException { 1104 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 1105 createRootDir(option.isCreateRootDir()); 1106 if (option.isCreateWALDir()) { 1107 createWALRootDir(); 1108 } 1109 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 1110 // for tests that do not read hbase-defaults.xml 1111 setHBaseFsTmpDir(); 1112 1113 // These settings will make the server waits until this exact number of 1114 // regions servers are connected. 1115 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 1116 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 1117 } 1118 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 1119 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 1120 } 1121 1122 Configuration c = new Configuration(this.conf); 1123 this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(), 1124 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1125 option.getMasterClass(), option.getRsClass()); 1126 // Populate the master address configuration from mini cluster configuration. 1127 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 1128 // Don't leave here till we've done a successful scan of the hbase:meta 1129 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 1130 ResultScanner s = t.getScanner(new Scan())) { 1131 for (;;) { 1132 if (s.next() == null) { 1133 break; 1134 } 1135 } 1136 } 1137 1138 getAdmin(); // create immediately the hbaseAdmin 1139 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 1140 1141 return (MiniHBaseCluster) hbaseCluster; 1142 } 1143 1144 /** 1145 * Starts up mini hbase cluster using default options. Default options can be found in 1146 * {@link StartMiniClusterOption.Builder}. 1147 * @see #startMiniHBaseCluster(StartMiniClusterOption) 1148 * @see #shutdownMiniHBaseCluster() 1149 */ 1150 public MiniHBaseCluster startMiniHBaseCluster() throws IOException, InterruptedException { 1151 return startMiniHBaseCluster(StartMiniClusterOption.builder().build()); 1152 } 1153 1154 /** 1155 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 1156 * {@link #startMiniCluster()}. All other options will use default values, defined in 1157 * {@link StartMiniClusterOption.Builder}. 1158 * @param numMasters Master node number. 1159 * @param numRegionServers Number of region servers. 1160 * @return The mini HBase cluster created. 1161 * @see #shutdownMiniHBaseCluster() 1162 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 1163 * {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead. 1164 * @see #startMiniHBaseCluster(StartMiniClusterOption) 1165 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 1166 */ 1167 @Deprecated 1168 public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 1169 throws IOException, InterruptedException { 1170 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 1171 .numRegionServers(numRegionServers).build(); 1172 return startMiniHBaseCluster(option); 1173 } 1174 1175 /** 1176 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 1177 * {@link #startMiniCluster()}. All other options will use default values, defined in 1178 * {@link StartMiniClusterOption.Builder}. 1179 * @param numMasters Master node number. 1180 * @param numRegionServers Number of region servers. 1181 * @param rsPorts Ports that RegionServer should use. 1182 * @return The mini HBase cluster created. 1183 * @see #shutdownMiniHBaseCluster() 1184 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 1185 * {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead. 1186 * @see #startMiniHBaseCluster(StartMiniClusterOption) 1187 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 1188 */ 1189 @Deprecated 1190 public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 1191 List<Integer> rsPorts) throws IOException, InterruptedException { 1192 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 1193 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 1194 return startMiniHBaseCluster(option); 1195 } 1196 1197 /** 1198 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 1199 * {@link #startMiniCluster()}. All other options will use default values, defined in 1200 * {@link StartMiniClusterOption.Builder}. 1201 * @param numMasters Master node number. 1202 * @param numRegionServers Number of region servers. 1203 * @param rsPorts Ports that RegionServer should use. 1204 * @param masterClass The class to use as HMaster, or null for default. 1205 * @param rsClass The class to use as HRegionServer, or null for default. 1206 * @param createRootDir Whether to create a new root or data directory path. 1207 * @param createWALDir Whether to create a new WAL directory. 1208 * @return The mini HBase cluster created. 1209 * @see #shutdownMiniHBaseCluster() 1210 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 1211 * {@link #startMiniHBaseCluster(StartMiniClusterOption)} instead. 1212 * @see #startMiniHBaseCluster(StartMiniClusterOption) 1213 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 1214 */ 1215 @Deprecated 1216 public MiniHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 1217 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 1218 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, boolean createRootDir, 1219 boolean createWALDir) throws IOException, InterruptedException { 1220 StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(numMasters) 1221 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 1222 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 1223 return startMiniHBaseCluster(option); 1224 } 1225 1226 /** 1227 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 1228 * want to keep dfs/zk up and just stop/start hbase. 1229 * @param servers number of region servers 1230 */ 1231 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 1232 this.restartHBaseCluster(servers, null); 1233 } 1234 1235 public void restartHBaseCluster(int servers, List<Integer> ports) 1236 throws IOException, InterruptedException { 1237 StartMiniClusterOption option = 1238 StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 1239 restartHBaseCluster(option); 1240 invalidateConnection(); 1241 } 1242 1243 public void restartHBaseCluster(StartMiniClusterOption option) 1244 throws IOException, InterruptedException { 1245 closeConnection(); 1246 this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(), 1247 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 1248 option.getMasterClass(), option.getRsClass()); 1249 // Don't leave here till we've done a successful scan of the hbase:meta 1250 Connection conn = ConnectionFactory.createConnection(this.conf); 1251 Table t = conn.getTable(TableName.META_TABLE_NAME); 1252 ResultScanner s = t.getScanner(new Scan()); 1253 while (s.next() != null) { 1254 // do nothing 1255 } 1256 LOG.info("HBase has been restarted"); 1257 s.close(); 1258 t.close(); 1259 conn.close(); 1260 } 1261 1262 /** 1263 * @return Current mini hbase cluster. Only has something in it after a call to 1264 * {@link #startMiniCluster()}. 1265 * @see #startMiniCluster() 1266 */ 1267 public MiniHBaseCluster getMiniHBaseCluster() { 1268 if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) { 1269 return (MiniHBaseCluster) this.hbaseCluster; 1270 } 1271 throw new RuntimeException( 1272 hbaseCluster + " not an instance of " + MiniHBaseCluster.class.getName()); 1273 } 1274 1275 /** 1276 * Stops mini hbase, zk, and hdfs clusters. 1277 * @see #startMiniCluster(int) 1278 */ 1279 public void shutdownMiniCluster() throws IOException { 1280 LOG.info("Shutting down minicluster"); 1281 shutdownMiniHBaseCluster(); 1282 shutdownMiniDFSCluster(); 1283 shutdownMiniZKCluster(); 1284 1285 cleanupTestDir(); 1286 miniClusterRunning = false; 1287 LOG.info("Minicluster is down"); 1288 } 1289 1290 /** 1291 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1292 * @throws java.io.IOException in case command is unsuccessful 1293 */ 1294 public void shutdownMiniHBaseCluster() throws IOException { 1295 cleanup(); 1296 if (this.hbaseCluster != null) { 1297 this.hbaseCluster.shutdown(); 1298 // Wait till hbase is down before going on to shutdown zk. 1299 this.hbaseCluster.waitUntilShutDown(); 1300 this.hbaseCluster = null; 1301 } 1302 if (zooKeeperWatcher != null) { 1303 zooKeeperWatcher.close(); 1304 zooKeeperWatcher = null; 1305 } 1306 } 1307 1308 /** 1309 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1310 * @throws java.io.IOException throws in case command is unsuccessful 1311 */ 1312 public void killMiniHBaseCluster() throws IOException { 1313 cleanup(); 1314 if (this.hbaseCluster != null) { 1315 getMiniHBaseCluster().killAll(); 1316 this.hbaseCluster = null; 1317 } 1318 if (zooKeeperWatcher != null) { 1319 zooKeeperWatcher.close(); 1320 zooKeeperWatcher = null; 1321 } 1322 } 1323 1324 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1325 private void cleanup() throws IOException { 1326 closeConnection(); 1327 // unset the configuration for MIN and MAX RS to start 1328 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1329 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1330 } 1331 1332 /** 1333 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1334 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1335 * If false, previous path is used. Note: this does not cause the root dir to be created. 1336 * @return Fully qualified path for the default hbase root dir 1337 */ 1338 public Path getDefaultRootDirPath(boolean create) throws IOException { 1339 if (!create) { 1340 return getDataTestDirOnTestFS(); 1341 } else { 1342 return getNewDataTestDirOnTestFS(); 1343 } 1344 } 1345 1346 /** 1347 * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)} except that 1348 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1349 * @return Fully qualified path for the default hbase root dir 1350 */ 1351 public Path getDefaultRootDirPath() throws IOException { 1352 return getDefaultRootDirPath(false); 1353 } 1354 1355 /** 1356 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1357 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1358 * startup. You'd only use this method if you were doing manual operation. 1359 * @param create This flag decides whether to get a new root or data directory path or not, if it 1360 * has been fetched already. Note : Directory will be made irrespective of whether 1361 * path has been fetched or not. If directory already exists, it will be overwritten 1362 * @return Fully qualified path to hbase root dir 1363 */ 1364 public Path createRootDir(boolean create) throws IOException { 1365 FileSystem fs = FileSystem.get(this.conf); 1366 Path hbaseRootdir = getDefaultRootDirPath(create); 1367 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1368 fs.mkdirs(hbaseRootdir); 1369 FSUtils.setVersion(fs, hbaseRootdir); 1370 return hbaseRootdir; 1371 } 1372 1373 /** 1374 * Same as {@link HBaseTestingUtility#createRootDir(boolean create)} except that 1375 * <code>create</code> flag is false. 1376 * @return Fully qualified path to hbase root dir 1377 */ 1378 public Path createRootDir() throws IOException { 1379 return createRootDir(false); 1380 } 1381 1382 /** 1383 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1384 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1385 * this method if you were doing manual operation. 1386 * @return Fully qualified path to hbase root dir 1387 */ 1388 public Path createWALRootDir() throws IOException { 1389 FileSystem fs = FileSystem.get(this.conf); 1390 Path walDir = getNewDataTestDirOnTestFS(); 1391 CommonFSUtils.setWALRootDir(this.conf, walDir); 1392 fs.mkdirs(walDir); 1393 return walDir; 1394 } 1395 1396 private void setHBaseFsTmpDir() throws IOException { 1397 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1398 if (hbaseFsTmpDirInString == null) { 1399 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1400 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1401 } else { 1402 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1403 } 1404 } 1405 1406 /** 1407 * Flushes all caches in the mini hbase cluster 1408 */ 1409 public void flush() throws IOException { 1410 getMiniHBaseCluster().flushcache(); 1411 } 1412 1413 /** 1414 * Flushes all caches in the mini hbase cluster 1415 */ 1416 public void flush(TableName tableName) throws IOException { 1417 getMiniHBaseCluster().flushcache(tableName); 1418 } 1419 1420 /** 1421 * Compact all regions in the mini hbase cluster 1422 */ 1423 public void compact(boolean major) throws IOException { 1424 getMiniHBaseCluster().compact(major); 1425 } 1426 1427 /** 1428 * Compact all of a table's reagion in the mini hbase cluster 1429 */ 1430 public void compact(TableName tableName, boolean major) throws IOException { 1431 getMiniHBaseCluster().compact(tableName, major); 1432 } 1433 1434 /** 1435 * Create a table. 1436 * @return A Table instance for the created table. 1437 */ 1438 public Table createTable(TableName tableName, String family) throws IOException { 1439 return createTable(tableName, new String[] { family }); 1440 } 1441 1442 /** 1443 * Create a table. 1444 * @return A Table instance for the created table. 1445 */ 1446 public Table createTable(TableName tableName, String[] families) throws IOException { 1447 List<byte[]> fams = new ArrayList<>(families.length); 1448 for (String family : families) { 1449 fams.add(Bytes.toBytes(family)); 1450 } 1451 return createTable(tableName, fams.toArray(new byte[0][])); 1452 } 1453 1454 /** 1455 * Create a table. 1456 * @return A Table instance for the created table. 1457 */ 1458 public Table createTable(TableName tableName, byte[] family) throws IOException { 1459 return createTable(tableName, new byte[][] { family }); 1460 } 1461 1462 /** 1463 * Create a table with multiple regions. 1464 * @return A Table instance for the created table. 1465 */ 1466 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1467 throws IOException { 1468 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1469 byte[] startKey = Bytes.toBytes("aaaaa"); 1470 byte[] endKey = Bytes.toBytes("zzzzz"); 1471 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1472 1473 return createTable(tableName, new byte[][] { family }, splitKeys); 1474 } 1475 1476 /** 1477 * Create a table. 1478 * @return A Table instance for the created table. 1479 */ 1480 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1481 return createTable(tableName, families, (byte[][]) null); 1482 } 1483 1484 /** 1485 * Create a table with multiple regions. 1486 * @return A Table instance for the created table. 1487 */ 1488 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1489 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1490 } 1491 1492 /** 1493 * Create a table with multiple regions. 1494 * @param replicaCount replica count. 1495 * @return A Table instance for the created table. 1496 */ 1497 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1498 throws IOException { 1499 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1500 } 1501 1502 /** 1503 * Create a table. 1504 * @return A Table instance for the created table. 1505 */ 1506 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1507 throws IOException { 1508 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1509 } 1510 1511 /** 1512 * Create a table. 1513 * @param tableName the table name 1514 * @param families the families 1515 * @param splitKeys the splitkeys 1516 * @param replicaCount the region replica count 1517 * @return A Table instance for the created table. 1518 * @throws IOException throws IOException 1519 */ 1520 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1521 int replicaCount) throws IOException { 1522 return createTable(tableName, families, splitKeys, replicaCount, 1523 new Configuration(getConfiguration())); 1524 } 1525 1526 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1527 byte[] endKey, int numRegions) throws IOException { 1528 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1529 1530 getAdmin().createTable(desc, startKey, endKey, numRegions); 1531 // HBaseAdmin only waits for regions to appear in hbase:meta we 1532 // should wait until they are assigned 1533 waitUntilAllRegionsAssigned(tableName); 1534 return getConnection().getTable(tableName); 1535 } 1536 1537 /** 1538 * Create a table. 1539 * @param c Configuration to use 1540 * @return A Table instance for the created table. 1541 */ 1542 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1543 throws IOException { 1544 return createTable(htd, families, null, c); 1545 } 1546 1547 /** 1548 * Create a table. 1549 * @param htd table descriptor 1550 * @param families array of column families 1551 * @param splitKeys array of split keys 1552 * @param c Configuration to use 1553 * @return A Table instance for the created table. 1554 * @throws IOException if getAdmin or createTable fails 1555 */ 1556 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1557 Configuration c) throws IOException { 1558 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1559 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1560 // on is interfering. 1561 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1562 } 1563 1564 /** 1565 * Create a table. 1566 * @param htd table descriptor 1567 * @param families array of column families 1568 * @param splitKeys array of split keys 1569 * @param type Bloom type 1570 * @param blockSize block size 1571 * @param c Configuration to use 1572 * @return A Table instance for the created table. 1573 * @throws IOException if getAdmin or createTable fails 1574 */ 1575 1576 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1577 BloomType type, int blockSize, Configuration c) throws IOException { 1578 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1579 for (byte[] family : families) { 1580 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1581 .setBloomFilterType(type).setBlocksize(blockSize); 1582 if (isNewVersionBehaviorEnabled()) { 1583 cfdb.setNewVersionBehavior(true); 1584 } 1585 builder.setColumnFamily(cfdb.build()); 1586 } 1587 TableDescriptor td = builder.build(); 1588 if (splitKeys != null) { 1589 getAdmin().createTable(td, splitKeys); 1590 } else { 1591 getAdmin().createTable(td); 1592 } 1593 // HBaseAdmin only waits for regions to appear in hbase:meta 1594 // we should wait until they are assigned 1595 waitUntilAllRegionsAssigned(td.getTableName()); 1596 return getConnection().getTable(td.getTableName()); 1597 } 1598 1599 /** 1600 * Create a table. 1601 * @param htd table descriptor 1602 * @param splitRows array of split keys 1603 * @return A Table instance for the created table. 1604 */ 1605 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1606 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1607 if (isNewVersionBehaviorEnabled()) { 1608 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1609 builder.setColumnFamily( 1610 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1611 } 1612 } 1613 if (splitRows != null) { 1614 getAdmin().createTable(builder.build(), splitRows); 1615 } else { 1616 getAdmin().createTable(builder.build()); 1617 } 1618 // HBaseAdmin only waits for regions to appear in hbase:meta 1619 // we should wait until they are assigned 1620 waitUntilAllRegionsAssigned(htd.getTableName()); 1621 return getConnection().getTable(htd.getTableName()); 1622 } 1623 1624 /** 1625 * Create a table. 1626 * @param tableName the table name 1627 * @param families the families 1628 * @param splitKeys the split keys 1629 * @param replicaCount the replica count 1630 * @param c Configuration to use 1631 * @return A Table instance for the created table. 1632 */ 1633 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1634 int replicaCount, final Configuration c) throws IOException { 1635 TableDescriptor htd = 1636 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1637 return createTable(htd, families, splitKeys, c); 1638 } 1639 1640 /** 1641 * Create a table. 1642 * @return A Table instance for the created table. 1643 */ 1644 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1645 return createTable(tableName, new byte[][] { family }, numVersions); 1646 } 1647 1648 /** 1649 * Create a table. 1650 * @return A Table instance for the created table. 1651 */ 1652 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1653 throws IOException { 1654 return createTable(tableName, families, numVersions, (byte[][]) null); 1655 } 1656 1657 /** 1658 * Create a table. 1659 * @return A Table instance for the created table. 1660 */ 1661 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1662 byte[][] splitKeys) throws IOException { 1663 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1664 for (byte[] family : families) { 1665 ColumnFamilyDescriptorBuilder cfBuilder = 1666 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1667 if (isNewVersionBehaviorEnabled()) { 1668 cfBuilder.setNewVersionBehavior(true); 1669 } 1670 builder.setColumnFamily(cfBuilder.build()); 1671 } 1672 if (splitKeys != null) { 1673 getAdmin().createTable(builder.build(), splitKeys); 1674 } else { 1675 getAdmin().createTable(builder.build()); 1676 } 1677 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1678 // assigned 1679 waitUntilAllRegionsAssigned(tableName); 1680 return getConnection().getTable(tableName); 1681 } 1682 1683 /** 1684 * Create a table with multiple regions. 1685 * @return A Table instance for the created table. 1686 */ 1687 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1688 throws IOException { 1689 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1690 } 1691 1692 /** 1693 * Create a table. 1694 * @return A Table instance for the created table. 1695 */ 1696 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1697 throws IOException { 1698 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1699 for (byte[] family : families) { 1700 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1701 .setMaxVersions(numVersions).setBlocksize(blockSize); 1702 if (isNewVersionBehaviorEnabled()) { 1703 cfBuilder.setNewVersionBehavior(true); 1704 } 1705 builder.setColumnFamily(cfBuilder.build()); 1706 } 1707 getAdmin().createTable(builder.build()); 1708 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1709 // assigned 1710 waitUntilAllRegionsAssigned(tableName); 1711 return getConnection().getTable(tableName); 1712 } 1713 1714 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1715 String cpName) throws IOException { 1716 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1717 for (byte[] family : families) { 1718 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1719 .setMaxVersions(numVersions).setBlocksize(blockSize); 1720 if (isNewVersionBehaviorEnabled()) { 1721 cfBuilder.setNewVersionBehavior(true); 1722 } 1723 builder.setColumnFamily(cfBuilder.build()); 1724 } 1725 if (cpName != null) { 1726 builder.setCoprocessor(cpName); 1727 } 1728 getAdmin().createTable(builder.build()); 1729 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1730 // assigned 1731 waitUntilAllRegionsAssigned(tableName); 1732 return getConnection().getTable(tableName); 1733 } 1734 1735 /** 1736 * Create a table. 1737 * @return A Table instance for the created table. 1738 */ 1739 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1740 throws IOException { 1741 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1742 int i = 0; 1743 for (byte[] family : families) { 1744 ColumnFamilyDescriptorBuilder cfBuilder = 1745 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1746 if (isNewVersionBehaviorEnabled()) { 1747 cfBuilder.setNewVersionBehavior(true); 1748 } 1749 builder.setColumnFamily(cfBuilder.build()); 1750 i++; 1751 } 1752 getAdmin().createTable(builder.build()); 1753 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1754 // assigned 1755 waitUntilAllRegionsAssigned(tableName); 1756 return getConnection().getTable(tableName); 1757 } 1758 1759 /** 1760 * Create a table. 1761 * @return A Table instance for the created table. 1762 */ 1763 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1764 throws IOException { 1765 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1766 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1767 if (isNewVersionBehaviorEnabled()) { 1768 cfBuilder.setNewVersionBehavior(true); 1769 } 1770 builder.setColumnFamily(cfBuilder.build()); 1771 getAdmin().createTable(builder.build(), splitRows); 1772 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1773 // assigned 1774 waitUntilAllRegionsAssigned(tableName); 1775 return getConnection().getTable(tableName); 1776 } 1777 1778 /** 1779 * Create a table with multiple regions. 1780 * @return A Table instance for the created table. 1781 */ 1782 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1783 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1784 } 1785 1786 /** 1787 * Modify a table, synchronous. 1788 * @deprecated since 3.0.0 and will be removed in 4.0.0. Just use 1789 * {@link Admin#modifyTable(TableDescriptor)} directly as it is synchronous now. 1790 * @see Admin#modifyTable(TableDescriptor) 1791 * @see <a href="https://issues.apache.org/jira/browse/HBASE-22002">HBASE-22002</a> 1792 */ 1793 @Deprecated 1794 public static void modifyTableSync(Admin admin, TableDescriptor desc) 1795 throws IOException, InterruptedException { 1796 admin.modifyTable(desc); 1797 } 1798 1799 /** 1800 * Set the number of Region replicas. 1801 */ 1802 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1803 throws IOException, InterruptedException { 1804 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1805 .setRegionReplication(replicaCount).build(); 1806 admin.modifyTable(desc); 1807 } 1808 1809 /** 1810 * Drop an existing table 1811 * @param tableName existing table 1812 */ 1813 public void deleteTable(TableName tableName) throws IOException { 1814 try { 1815 getAdmin().disableTable(tableName); 1816 } catch (TableNotEnabledException e) { 1817 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1818 } 1819 getAdmin().deleteTable(tableName); 1820 } 1821 1822 /** 1823 * Drop an existing table 1824 * @param tableName existing table 1825 */ 1826 public void deleteTableIfAny(TableName tableName) throws IOException { 1827 try { 1828 deleteTable(tableName); 1829 } catch (TableNotFoundException e) { 1830 // ignore 1831 } 1832 } 1833 1834 // ========================================================================== 1835 // Canned table and table descriptor creation 1836 1837 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1838 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1839 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1840 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1841 private static final int MAXVERSIONS = 3; 1842 1843 public static final char FIRST_CHAR = 'a'; 1844 public static final char LAST_CHAR = 'z'; 1845 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1846 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1847 1848 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1849 return createModifyableTableDescriptor(TableName.valueOf(name), 1850 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1851 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1852 } 1853 1854 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1855 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1856 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1857 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1858 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1859 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1860 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1861 if (isNewVersionBehaviorEnabled()) { 1862 cfBuilder.setNewVersionBehavior(true); 1863 } 1864 builder.setColumnFamily(cfBuilder.build()); 1865 } 1866 return builder.build(); 1867 } 1868 1869 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1870 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1871 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1872 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1873 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1874 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1875 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1876 if (isNewVersionBehaviorEnabled()) { 1877 cfBuilder.setNewVersionBehavior(true); 1878 } 1879 builder.setColumnFamily(cfBuilder.build()); 1880 } 1881 return builder; 1882 } 1883 1884 /** 1885 * Create a table of name <code>name</code>. 1886 * @param name Name to give table. 1887 * @return Column descriptor. 1888 */ 1889 public TableDescriptor createTableDescriptor(final TableName name) { 1890 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1891 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1892 } 1893 1894 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1895 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1896 } 1897 1898 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1899 int maxVersions) { 1900 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1901 for (byte[] family : families) { 1902 ColumnFamilyDescriptorBuilder cfBuilder = 1903 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1904 if (isNewVersionBehaviorEnabled()) { 1905 cfBuilder.setNewVersionBehavior(true); 1906 } 1907 builder.setColumnFamily(cfBuilder.build()); 1908 } 1909 return builder.build(); 1910 } 1911 1912 /** 1913 * Create an HRegion that writes to the local tmp dirs 1914 * @param desc a table descriptor indicating which table the region belongs to 1915 * @param startKey the start boundary of the region 1916 * @param endKey the end boundary of the region 1917 * @return a region that writes to local dir for testing 1918 */ 1919 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1920 throws IOException { 1921 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1922 .setEndKey(endKey).build(); 1923 return createLocalHRegion(hri, desc); 1924 } 1925 1926 /** 1927 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1928 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it. 1929 */ 1930 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1931 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1932 } 1933 1934 /** 1935 * Create an HRegion that writes to the local tmp dirs with specified wal 1936 * @param info regioninfo 1937 * @param conf configuration 1938 * @param desc table descriptor 1939 * @param wal wal for this region. 1940 * @return created hregion 1941 */ 1942 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1943 WAL wal) throws IOException { 1944 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1945 } 1946 1947 /** 1948 * Return a region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} 1949 * when done. 1950 */ 1951 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1952 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1953 throws IOException { 1954 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1955 durability, wal, null, families); 1956 } 1957 1958 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1959 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1960 boolean[] compactedMemStore, byte[]... families) throws IOException { 1961 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1962 builder.setReadOnly(isReadOnly); 1963 int i = 0; 1964 for (byte[] family : families) { 1965 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1966 if (compactedMemStore != null && i < compactedMemStore.length) { 1967 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1968 } else { 1969 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1970 1971 } 1972 i++; 1973 // Set default to be three versions. 1974 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1975 builder.setColumnFamily(cfBuilder.build()); 1976 } 1977 builder.setDurability(durability); 1978 RegionInfo info = 1979 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1980 return createLocalHRegion(info, conf, builder.build(), wal); 1981 } 1982 1983 // 1984 // ========================================================================== 1985 1986 /** 1987 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1988 * read. 1989 * @param tableName existing table 1990 * @return HTable to that new table 1991 */ 1992 public Table deleteTableData(TableName tableName) throws IOException { 1993 Table table = getConnection().getTable(tableName); 1994 Scan scan = new Scan(); 1995 ResultScanner resScan = table.getScanner(scan); 1996 for (Result res : resScan) { 1997 Delete del = new Delete(res.getRow()); 1998 table.delete(del); 1999 } 2000 resScan = table.getScanner(scan); 2001 resScan.close(); 2002 return table; 2003 } 2004 2005 /** 2006 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 2007 * table. 2008 * @param tableName table which must exist. 2009 * @param preserveRegions keep the existing split points 2010 * @return HTable for the new table 2011 */ 2012 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 2013 throws IOException { 2014 Admin admin = getAdmin(); 2015 if (!admin.isTableDisabled(tableName)) { 2016 admin.disableTable(tableName); 2017 } 2018 admin.truncateTable(tableName, preserveRegions); 2019 return getConnection().getTable(tableName); 2020 } 2021 2022 /** 2023 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 2024 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 2025 * preserve regions of existing table. 2026 * @param tableName table which must exist. 2027 * @return HTable for the new table 2028 */ 2029 public Table truncateTable(final TableName tableName) throws IOException { 2030 return truncateTable(tableName, false); 2031 } 2032 2033 /** 2034 * Load table with rows from 'aaa' to 'zzz'. 2035 * @param t Table 2036 * @param f Family 2037 * @return Count of rows loaded. 2038 */ 2039 public int loadTable(final Table t, final byte[] f) throws IOException { 2040 return loadTable(t, new byte[][] { f }); 2041 } 2042 2043 /** 2044 * Load table with rows from 'aaa' to 'zzz'. 2045 * @param t Table 2046 * @param f Family 2047 * @return Count of rows loaded. 2048 */ 2049 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 2050 return loadTable(t, new byte[][] { f }, null, writeToWAL); 2051 } 2052 2053 /** 2054 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 2055 * @param t Table 2056 * @param f Array of Families to load 2057 * @return Count of rows loaded. 2058 */ 2059 public int loadTable(final Table t, final byte[][] f) throws IOException { 2060 return loadTable(t, f, null); 2061 } 2062 2063 /** 2064 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 2065 * @param t Table 2066 * @param f Array of Families to load 2067 * @param value the values of the cells. If null is passed, the row key is used as value 2068 * @return Count of rows loaded. 2069 */ 2070 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 2071 return loadTable(t, f, value, true); 2072 } 2073 2074 /** 2075 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 2076 * @param t Table 2077 * @param f Array of Families to load 2078 * @param value the values of the cells. If null is passed, the row key is used as value 2079 * @return Count of rows loaded. 2080 */ 2081 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 2082 throws IOException { 2083 List<Put> puts = new ArrayList<>(); 2084 for (byte[] row : HBaseTestingUtility.ROWS) { 2085 Put put = new Put(row); 2086 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 2087 for (int i = 0; i < f.length; i++) { 2088 byte[] value1 = value != null ? value : row; 2089 put.addColumn(f[i], f[i], value1); 2090 } 2091 puts.add(put); 2092 } 2093 t.put(puts); 2094 return puts.size(); 2095 } 2096 2097 /** 2098 * A tracker for tracking and validating table rows generated with 2099 * {@link HBaseTestingUtility#loadTable(Table, byte[])} 2100 */ 2101 public static class SeenRowTracker { 2102 int dim = 'z' - 'a' + 1; 2103 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 2104 byte[] startRow; 2105 byte[] stopRow; 2106 2107 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 2108 this.startRow = startRow; 2109 this.stopRow = stopRow; 2110 } 2111 2112 void reset() { 2113 for (byte[] row : ROWS) { 2114 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 2115 } 2116 } 2117 2118 int i(byte b) { 2119 return b - 'a'; 2120 } 2121 2122 public void addRow(byte[] row) { 2123 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 2124 } 2125 2126 /** 2127 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 2128 * rows none 2129 */ 2130 public void validate() { 2131 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2132 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2133 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2134 int count = seenRows[i(b1)][i(b2)][i(b3)]; 2135 int expectedCount = 0; 2136 if ( 2137 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 2138 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 2139 ) { 2140 expectedCount = 1; 2141 } 2142 if (count != expectedCount) { 2143 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 2144 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 2145 + "instead of " + expectedCount); 2146 } 2147 } 2148 } 2149 } 2150 } 2151 } 2152 2153 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 2154 return loadRegion(r, f, false); 2155 } 2156 2157 public int loadRegion(final Region r, final byte[] f) throws IOException { 2158 return loadRegion((HRegion) r, f); 2159 } 2160 2161 /** 2162 * Load region with rows from 'aaa' to 'zzz'. 2163 * @param r Region 2164 * @param f Family 2165 * @param flush flush the cache if true 2166 * @return Count of rows loaded. 2167 */ 2168 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 2169 byte[] k = new byte[3]; 2170 int rowCount = 0; 2171 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2172 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2173 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2174 k[0] = b1; 2175 k[1] = b2; 2176 k[2] = b3; 2177 Put put = new Put(k); 2178 put.setDurability(Durability.SKIP_WAL); 2179 put.addColumn(f, null, k); 2180 if (r.getWAL() == null) { 2181 put.setDurability(Durability.SKIP_WAL); 2182 } 2183 int preRowCount = rowCount; 2184 int pause = 10; 2185 int maxPause = 1000; 2186 while (rowCount == preRowCount) { 2187 try { 2188 r.put(put); 2189 rowCount++; 2190 } catch (RegionTooBusyException e) { 2191 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 2192 Threads.sleep(pause); 2193 } 2194 } 2195 } 2196 } 2197 if (flush) { 2198 r.flush(true); 2199 } 2200 } 2201 return rowCount; 2202 } 2203 2204 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2205 throws IOException { 2206 for (int i = startRow; i < endRow; i++) { 2207 byte[] data = Bytes.toBytes(String.valueOf(i)); 2208 Put put = new Put(data); 2209 put.addColumn(f, null, data); 2210 t.put(put); 2211 } 2212 } 2213 2214 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 2215 throws IOException { 2216 byte[] row = new byte[rowSize]; 2217 for (int i = 0; i < totalRows; i++) { 2218 Bytes.random(row); 2219 Put put = new Put(row); 2220 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 2221 t.put(put); 2222 } 2223 } 2224 2225 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 2226 int replicaId) throws IOException { 2227 for (int i = startRow; i < endRow; i++) { 2228 String failMsg = "Failed verification of row :" + i; 2229 byte[] data = Bytes.toBytes(String.valueOf(i)); 2230 Get get = new Get(data); 2231 get.setReplicaId(replicaId); 2232 get.setConsistency(Consistency.TIMELINE); 2233 Result result = table.get(get); 2234 if (!result.containsColumn(f, null)) { 2235 throw new AssertionError(failMsg); 2236 } 2237 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2238 Cell cell = result.getColumnLatestCell(f, null); 2239 if ( 2240 !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), 2241 cell.getValueLength()) 2242 ) { 2243 throw new AssertionError(failMsg); 2244 } 2245 } 2246 } 2247 2248 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 2249 throws IOException { 2250 verifyNumericRows((HRegion) region, f, startRow, endRow); 2251 } 2252 2253 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 2254 throws IOException { 2255 verifyNumericRows(region, f, startRow, endRow, true); 2256 } 2257 2258 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 2259 final boolean present) throws IOException { 2260 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 2261 } 2262 2263 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 2264 final boolean present) throws IOException { 2265 for (int i = startRow; i < endRow; i++) { 2266 String failMsg = "Failed verification of row :" + i; 2267 byte[] data = Bytes.toBytes(String.valueOf(i)); 2268 Result result = region.get(new Get(data)); 2269 2270 boolean hasResult = result != null && !result.isEmpty(); 2271 if (present != hasResult) { 2272 throw new AssertionError( 2273 failMsg + result + " expected:<" + present + "> but was:<" + hasResult + ">"); 2274 } 2275 if (!present) continue; 2276 2277 if (!result.containsColumn(f, null)) { 2278 throw new AssertionError(failMsg); 2279 } 2280 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2281 Cell cell = result.getColumnLatestCell(f, null); 2282 if ( 2283 !Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(), 2284 cell.getValueLength()) 2285 ) { 2286 throw new AssertionError(failMsg); 2287 } 2288 } 2289 } 2290 2291 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2292 throws IOException { 2293 for (int i = startRow; i < endRow; i++) { 2294 byte[] data = Bytes.toBytes(String.valueOf(i)); 2295 Delete delete = new Delete(data); 2296 delete.addFamily(f); 2297 t.delete(delete); 2298 } 2299 } 2300 2301 /** 2302 * Return the number of rows in the given table. 2303 * @param table to count rows 2304 * @return count of rows 2305 */ 2306 public static int countRows(final Table table) throws IOException { 2307 return countRows(table, new Scan()); 2308 } 2309 2310 public static int countRows(final Table table, final Scan scan) throws IOException { 2311 try (ResultScanner results = table.getScanner(scan)) { 2312 int count = 0; 2313 while (results.next() != null) { 2314 count++; 2315 } 2316 return count; 2317 } 2318 } 2319 2320 public int countRows(final Table table, final byte[]... families) throws IOException { 2321 Scan scan = new Scan(); 2322 for (byte[] family : families) { 2323 scan.addFamily(family); 2324 } 2325 return countRows(table, scan); 2326 } 2327 2328 /** 2329 * Return the number of rows in the given table. 2330 */ 2331 public int countRows(final TableName tableName) throws IOException { 2332 Table table = getConnection().getTable(tableName); 2333 try { 2334 return countRows(table); 2335 } finally { 2336 table.close(); 2337 } 2338 } 2339 2340 public int countRows(final Region region) throws IOException { 2341 return countRows(region, new Scan()); 2342 } 2343 2344 public int countRows(final Region region, final Scan scan) throws IOException { 2345 InternalScanner scanner = region.getScanner(scan); 2346 try { 2347 return countRows(scanner); 2348 } finally { 2349 scanner.close(); 2350 } 2351 } 2352 2353 public int countRows(final InternalScanner scanner) throws IOException { 2354 int scannedCount = 0; 2355 List<Cell> results = new ArrayList<>(); 2356 boolean hasMore = true; 2357 while (hasMore) { 2358 hasMore = scanner.next(results); 2359 scannedCount += results.size(); 2360 results.clear(); 2361 } 2362 return scannedCount; 2363 } 2364 2365 /** 2366 * Return an md5 digest of the entire contents of a table. 2367 */ 2368 public String checksumRows(final Table table) throws Exception { 2369 2370 Scan scan = new Scan(); 2371 ResultScanner results = table.getScanner(scan); 2372 MessageDigest digest = MessageDigest.getInstance("MD5"); 2373 for (Result res : results) { 2374 digest.update(res.getRow()); 2375 } 2376 results.close(); 2377 return digest.toString(); 2378 } 2379 2380 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2381 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2382 static { 2383 int i = 0; 2384 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2385 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2386 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2387 ROWS[i][0] = b1; 2388 ROWS[i][1] = b2; 2389 ROWS[i][2] = b3; 2390 i++; 2391 } 2392 } 2393 } 2394 } 2395 2396 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2397 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2398 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2399 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2400 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2401 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2402 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2403 2404 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2405 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2406 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2407 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2408 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2409 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2410 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2411 2412 /** 2413 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2414 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2415 * @return list of region info for regions added to meta 2416 */ 2417 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2418 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2419 Table meta = getConnection().getTable(TableName.META_TABLE_NAME); 2420 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2421 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2422 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2423 TableState.State.ENABLED); 2424 // add custom ones 2425 for (int i = 0; i < startKeys.length; i++) { 2426 int j = (i + 1) % startKeys.length; 2427 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2428 .setEndKey(startKeys[j]).build(); 2429 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2430 newRegions.add(hri); 2431 } 2432 2433 meta.close(); 2434 return newRegions; 2435 } 2436 2437 /** 2438 * Create an unmanaged WAL. Be sure to close it when you're through. 2439 */ 2440 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2441 throws IOException { 2442 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2443 // unless I pass along via the conf. 2444 Configuration confForWAL = new Configuration(conf); 2445 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 2446 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri); 2447 } 2448 2449 /** 2450 * Create a region with it's own WAL. Be sure to call 2451 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. 2452 */ 2453 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2454 final Configuration conf, final TableDescriptor htd) throws IOException { 2455 return createRegionAndWAL(info, rootDir, conf, htd, true); 2456 } 2457 2458 /** 2459 * Create a region with it's own WAL. Be sure to call 2460 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. 2461 */ 2462 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2463 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2464 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2465 region.setBlockCache(blockCache); 2466 region.initialize(); 2467 return region; 2468 } 2469 2470 /** 2471 * Create a region with it's own WAL. Be sure to call 2472 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. 2473 */ 2474 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2475 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2476 throws IOException { 2477 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2478 region.setMobFileCache(mobFileCache); 2479 region.initialize(); 2480 return region; 2481 } 2482 2483 /** 2484 * Create a region with it's own WAL. Be sure to call 2485 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. 2486 */ 2487 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2488 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2489 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2490 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2491 WAL wal = createWal(conf, rootDir, info); 2492 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2493 } 2494 2495 /** 2496 * Returns all rows from the hbase:meta table. 2497 * @throws IOException When reading the rows fails. 2498 */ 2499 public List<byte[]> getMetaTableRows() throws IOException { 2500 // TODO: Redo using MetaTableAccessor class 2501 Table t = getConnection().getTable(TableName.META_TABLE_NAME); 2502 List<byte[]> rows = new ArrayList<>(); 2503 ResultScanner s = t.getScanner(new Scan()); 2504 for (Result result : s) { 2505 LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow())); 2506 rows.add(result.getRow()); 2507 } 2508 s.close(); 2509 t.close(); 2510 return rows; 2511 } 2512 2513 /** 2514 * Returns all rows from the hbase:meta table for a given user table 2515 * @throws IOException When reading the rows fails. 2516 */ 2517 public List<byte[]> getMetaTableRows(TableName tableName) throws IOException { 2518 // TODO: Redo using MetaTableAccessor. 2519 Table t = getConnection().getTable(TableName.META_TABLE_NAME); 2520 List<byte[]> rows = new ArrayList<>(); 2521 ResultScanner s = t.getScanner(new Scan()); 2522 for (Result result : s) { 2523 RegionInfo info = CatalogFamilyFormat.getRegionInfo(result); 2524 if (info == null) { 2525 LOG.error("No region info for row " + Bytes.toString(result.getRow())); 2526 // TODO figure out what to do for this new hosed case. 2527 continue; 2528 } 2529 2530 if (info.getTable().equals(tableName)) { 2531 LOG.info("getMetaTableRows: row -> " + Bytes.toStringBinary(result.getRow()) + info); 2532 rows.add(result.getRow()); 2533 } 2534 } 2535 s.close(); 2536 t.close(); 2537 return rows; 2538 } 2539 2540 /** 2541 * Returns all regions of the specified table 2542 * @param tableName the table name 2543 * @return all regions of the specified table 2544 * @throws IOException when getting the regions fails. 2545 */ 2546 private List<RegionInfo> getRegions(TableName tableName) throws IOException { 2547 try (Admin admin = getConnection().getAdmin()) { 2548 return admin.getRegions(tableName); 2549 } 2550 } 2551 2552 /** 2553 * Find any other region server which is different from the one identified by parameter 2554 * @return another region server 2555 */ 2556 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2557 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2558 if (!(rst.getRegionServer() == rs)) { 2559 return rst.getRegionServer(); 2560 } 2561 } 2562 return null; 2563 } 2564 2565 /** 2566 * Tool to get the reference to the region server object that holds the region of the specified 2567 * user table. 2568 * @param tableName user table to lookup in hbase:meta 2569 * @return region server that holds it, null if the row doesn't exist 2570 */ 2571 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2572 throws IOException, InterruptedException { 2573 List<RegionInfo> regions = getRegions(tableName); 2574 if (regions == null || regions.isEmpty()) { 2575 return null; 2576 } 2577 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2578 2579 byte[] firstRegionName = 2580 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2581 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2582 2583 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2584 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2585 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2586 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2587 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2588 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2589 while (retrier.shouldRetry()) { 2590 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2591 if (index != -1) { 2592 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2593 } 2594 // Came back -1. Region may not be online yet. Sleep a while. 2595 retrier.sleepUntilNextRetry(); 2596 } 2597 return null; 2598 } 2599 2600 /** 2601 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2602 * @throws IOException When starting the cluster fails. 2603 */ 2604 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2605 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2606 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2607 "99.0"); 2608 startMiniMapReduceCluster(2); 2609 return mrCluster; 2610 } 2611 2612 /** 2613 * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its 2614 * internal static LOG_DIR variable. 2615 */ 2616 private void forceChangeTaskLogDir() { 2617 Field logDirField; 2618 try { 2619 logDirField = TaskLog.class.getDeclaredField("LOG_DIR"); 2620 logDirField.setAccessible(true); 2621 2622 Field modifiersField = ReflectionUtils.getModifiersField(); 2623 modifiersField.setAccessible(true); 2624 modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL); 2625 2626 logDirField.set(null, new File(hadoopLogDir, "userlogs")); 2627 } catch (SecurityException e) { 2628 throw new RuntimeException(e); 2629 } catch (NoSuchFieldException e) { 2630 throw new RuntimeException(e); 2631 } catch (IllegalArgumentException e) { 2632 throw new RuntimeException(e); 2633 } catch (IllegalAccessException e) { 2634 throw new RuntimeException(e); 2635 } 2636 } 2637 2638 /** 2639 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2640 * filesystem. 2641 * @param servers The number of <code>TaskTracker</code>'s to start. 2642 * @throws IOException When starting the cluster fails. 2643 */ 2644 private void startMiniMapReduceCluster(final int servers) throws IOException { 2645 if (mrCluster != null) { 2646 throw new IllegalStateException("MiniMRCluster is already running"); 2647 } 2648 LOG.info("Starting mini mapreduce cluster..."); 2649 setupClusterTestDir(); 2650 createDirsAndSetProperties(); 2651 2652 forceChangeTaskLogDir(); 2653 2654 //// hadoop2 specific settings 2655 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2656 // we up the VM usable so that processes don't get killed. 2657 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2658 2659 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2660 // this avoids the problem by disabling speculative task execution in tests. 2661 conf.setBoolean("mapreduce.map.speculative", false); 2662 conf.setBoolean("mapreduce.reduce.speculative", false); 2663 //// 2664 2665 // Allow the user to override FS URI for this map-reduce cluster to use. 2666 mrCluster = 2667 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2668 1, null, null, new JobConf(this.conf)); 2669 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2670 if (jobConf == null) { 2671 jobConf = mrCluster.createJobConf(); 2672 } 2673 // Hadoop MiniMR overwrites this while it should not 2674 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2675 LOG.info("Mini mapreduce cluster started"); 2676 2677 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2678 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2679 // necessary config properties here. YARN-129 required adding a few properties. 2680 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2681 // this for mrv2 support; mr1 ignores this 2682 conf.set("mapreduce.framework.name", "yarn"); 2683 conf.setBoolean("yarn.is.minicluster", true); 2684 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2685 if (rmAddress != null) { 2686 conf.set("yarn.resourcemanager.address", rmAddress); 2687 } 2688 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2689 if (historyAddress != null) { 2690 conf.set("mapreduce.jobhistory.address", historyAddress); 2691 } 2692 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2693 if (schedulerAddress != null) { 2694 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2695 } 2696 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2697 if (mrJobHistoryWebappAddress != null) { 2698 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2699 } 2700 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2701 if (yarnRMWebappAddress != null) { 2702 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2703 } 2704 } 2705 2706 /** 2707 * Stops the previously started <code>MiniMRCluster</code>. 2708 */ 2709 public void shutdownMiniMapReduceCluster() { 2710 if (mrCluster != null) { 2711 LOG.info("Stopping mini mapreduce cluster..."); 2712 mrCluster.shutdown(); 2713 mrCluster = null; 2714 LOG.info("Mini mapreduce cluster stopped"); 2715 } 2716 // Restore configuration to point to local jobtracker 2717 conf.set("mapreduce.jobtracker.address", "local"); 2718 } 2719 2720 /** 2721 * Create a stubbed out RegionServerService, mainly for getting FS. 2722 */ 2723 public RegionServerServices createMockRegionServerService() throws IOException { 2724 return createMockRegionServerService((ServerName) null); 2725 } 2726 2727 /** 2728 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2729 * TestTokenAuthentication 2730 */ 2731 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2732 throws IOException { 2733 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2734 rss.setFileSystem(getTestFileSystem()); 2735 rss.setRpcServer(rpc); 2736 return rss; 2737 } 2738 2739 /** 2740 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2741 * TestOpenRegionHandler 2742 */ 2743 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2744 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2745 rss.setFileSystem(getTestFileSystem()); 2746 return rss; 2747 } 2748 2749 /** 2750 * Switches the logger for the given class to DEBUG level. 2751 * @param clazz The class for which to switch to debug logging. 2752 * @deprecated In 2.3.0, will be removed in 4.0.0. Only support changing log level on log4j now as 2753 * HBase only uses log4j. You should do this by your own as it you know which log 2754 * framework you are using then set the log level to debug is very easy. 2755 */ 2756 @Deprecated 2757 public void enableDebug(Class<?> clazz) { 2758 Log4jUtils.enableDebug(clazz); 2759 } 2760 2761 /** 2762 * Expire the Master's session 2763 */ 2764 public void expireMasterSession() throws Exception { 2765 HMaster master = getMiniHBaseCluster().getMaster(); 2766 expireSession(master.getZooKeeper(), false); 2767 } 2768 2769 /** 2770 * Expire a region server's session 2771 * @param index which RS 2772 */ 2773 public void expireRegionServerSession(int index) throws Exception { 2774 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2775 expireSession(rs.getZooKeeper(), false); 2776 decrementMinRegionServerCount(); 2777 } 2778 2779 private void decrementMinRegionServerCount() { 2780 // decrement the count for this.conf, for newly spwaned master 2781 // this.hbaseCluster shares this configuration too 2782 decrementMinRegionServerCount(getConfiguration()); 2783 2784 // each master thread keeps a copy of configuration 2785 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2786 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2787 } 2788 } 2789 2790 private void decrementMinRegionServerCount(Configuration conf) { 2791 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2792 if (currentCount != -1) { 2793 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2794 } 2795 } 2796 2797 public void expireSession(ZKWatcher nodeZK) throws Exception { 2798 expireSession(nodeZK, false); 2799 } 2800 2801 /** 2802 * Expire a ZooKeeper session as recommended in ZooKeeper documentation <a href= 2803 * "https://hbase.apache.org/docs/troubleshooting#troubleshooting-zookeeper">Troubleshooting 2804 * ZooKeeper</a> 2805 * <p/> 2806 * There are issues when doing this: 2807 * <ol> 2808 * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li> 2809 * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li> 2810 * </ol> 2811 * @param nodeZK - the ZK watcher to expire 2812 * @param checkStatus - true to check if we can create a Table with the current configuration. 2813 */ 2814 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2815 Configuration c = new Configuration(this.conf); 2816 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2817 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2818 byte[] password = zk.getSessionPasswd(); 2819 long sessionID = zk.getSessionId(); 2820 2821 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2822 // so we create a first watcher to be sure that the 2823 // event was sent. We expect that if our watcher receives the event 2824 // other watchers on the same machine will get is as well. 2825 // When we ask to close the connection, ZK does not close it before 2826 // we receive all the events, so don't have to capture the event, just 2827 // closing the connection should be enough. 2828 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2829 @Override 2830 public void process(WatchedEvent watchedEvent) { 2831 LOG.info("Monitor ZKW received event=" + watchedEvent); 2832 } 2833 }, sessionID, password); 2834 2835 // Making it expire 2836 ZooKeeper newZK = 2837 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2838 2839 // ensure that we have connection to the server before closing down, otherwise 2840 // the close session event will be eaten out before we start CONNECTING state 2841 long start = EnvironmentEdgeManager.currentTime(); 2842 while ( 2843 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2844 ) { 2845 Thread.sleep(1); 2846 } 2847 newZK.close(); 2848 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2849 2850 // Now closing & waiting to be sure that the clients get it. 2851 monitor.close(); 2852 2853 if (checkStatus) { 2854 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2855 } 2856 } 2857 2858 /** 2859 * Get the Mini HBase cluster. 2860 * @return hbase cluster 2861 * @see #getHBaseClusterInterface() 2862 */ 2863 public MiniHBaseCluster getHBaseCluster() { 2864 return getMiniHBaseCluster(); 2865 } 2866 2867 /** 2868 * Returns the HBaseCluster instance. 2869 * <p> 2870 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2871 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2872 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2873 * instead w/o the need to type-cast. 2874 */ 2875 public HBaseCluster getHBaseClusterInterface() { 2876 // implementation note: we should rename this method as #getHBaseCluster(), 2877 // but this would require refactoring 90+ calls. 2878 return hbaseCluster; 2879 } 2880 2881 /** 2882 * Resets the connections so that the next time getConnection() is called, a new connection is 2883 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2884 * the connection is not valid anymore. TODO: There should be a more coherent way of doing this. 2885 * Unfortunately the way tests are written, not all start() stop() calls go through this class. 2886 * Most tests directly operate on the underlying mini/local hbase cluster. That makes it difficult 2887 * for this wrapper class to maintain the connection state automatically. Cleaning this is a much 2888 * bigger refactor. 2889 */ 2890 public void invalidateConnection() throws IOException { 2891 closeConnection(); 2892 // Update the master addresses if they changed. 2893 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2894 final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY); 2895 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2896 masterConfigBefore, masterConfAfter); 2897 conf.set(HConstants.MASTER_ADDRS_KEY, 2898 getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY)); 2899 } 2900 2901 /** 2902 * Get a shared Connection to the cluster. this method is thread safe. 2903 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2904 */ 2905 public Connection getConnection() throws IOException { 2906 return getAsyncConnection().toConnection(); 2907 } 2908 2909 /** 2910 * Get a assigned Connection to the cluster. this method is thread safe. 2911 * @param user assigned user 2912 * @return A Connection with assigned user. 2913 */ 2914 public Connection getConnection(User user) throws IOException { 2915 return getAsyncConnection(user).toConnection(); 2916 } 2917 2918 /** 2919 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2920 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2921 * of cluster. 2922 */ 2923 public AsyncClusterConnection getAsyncConnection() throws IOException { 2924 try { 2925 return asyncConnection.updateAndGet(connection -> { 2926 if (connection == null) { 2927 try { 2928 User user = UserProvider.instantiate(conf).getCurrent(); 2929 connection = getAsyncConnection(user); 2930 } catch (IOException ioe) { 2931 throw new UncheckedIOException("Failed to create connection", ioe); 2932 } 2933 } 2934 return connection; 2935 }); 2936 } catch (UncheckedIOException exception) { 2937 throw exception.getCause(); 2938 } 2939 } 2940 2941 /** 2942 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2943 * @param user assigned user 2944 * @return An AsyncClusterConnection with assigned user. 2945 */ 2946 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2947 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2948 } 2949 2950 public void closeConnection() throws IOException { 2951 if (hbaseAdmin != null) { 2952 Closeables.close(hbaseAdmin, true); 2953 hbaseAdmin = null; 2954 } 2955 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2956 if (asyncConnection != null) { 2957 Closeables.close(asyncConnection, true); 2958 } 2959 } 2960 2961 /** 2962 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2963 * it has no effect, it will be closed automatically when the cluster shutdowns 2964 */ 2965 public Admin getAdmin() throws IOException { 2966 if (hbaseAdmin == null) { 2967 this.hbaseAdmin = getConnection().getAdmin(); 2968 } 2969 return hbaseAdmin; 2970 } 2971 2972 public KeymetaAdminClient getKeymetaAdmin() throws IOException { 2973 return new KeymetaAdminClient(getConnection()); 2974 } 2975 2976 /** 2977 * Returns an {@link Hbck} instance. Needs be closed when done. 2978 */ 2979 public Hbck getHbck() throws IOException { 2980 return getConnection().getHbck(); 2981 } 2982 2983 /** 2984 * Unassign the named region. 2985 * @param regionName The region to unassign. 2986 */ 2987 public void unassignRegion(String regionName) throws IOException { 2988 unassignRegion(Bytes.toBytes(regionName)); 2989 } 2990 2991 /** 2992 * Unassign the named region. 2993 * @param regionName The region to unassign. 2994 */ 2995 public void unassignRegion(byte[] regionName) throws IOException { 2996 getAdmin().unassign(regionName, true); 2997 } 2998 2999 /** 3000 * Closes the region containing the given row. 3001 * @param row The row to find the containing region. 3002 * @param table The table to find the region. 3003 */ 3004 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 3005 unassignRegionByRow(Bytes.toBytes(row), table); 3006 } 3007 3008 /** 3009 * Closes the region containing the given row. 3010 * @param row The row to find the containing region. 3011 * @param table The table to find the region. 3012 */ 3013 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 3014 HRegionLocation hrl = table.getRegionLocation(row); 3015 unassignRegion(hrl.getRegion().getRegionName()); 3016 } 3017 3018 /** 3019 * Retrieves a splittable region randomly from tableName 3020 * @param tableName name of table 3021 * @param maxAttempts maximum number of attempts, unlimited for value of -1 3022 * @return the HRegion chosen, null if none was found within limit of maxAttempts 3023 */ 3024 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 3025 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 3026 int regCount = regions.size(); 3027 Set<Integer> attempted = new HashSet<>(); 3028 int idx; 3029 int attempts = 0; 3030 do { 3031 regions = getHBaseCluster().getRegions(tableName); 3032 if (regCount != regions.size()) { 3033 // if there was region movement, clear attempted Set 3034 attempted.clear(); 3035 } 3036 regCount = regions.size(); 3037 // There are chances that before we get the region for the table from an RS the region may 3038 // be going for CLOSE. This may be because online schema change is enabled 3039 if (regCount > 0) { 3040 idx = ThreadLocalRandom.current().nextInt(regCount); 3041 // if we have just tried this region, there is no need to try again 3042 if (attempted.contains(idx)) { 3043 continue; 3044 } 3045 HRegion region = regions.get(idx); 3046 if (region.checkSplit().isPresent()) { 3047 return region; 3048 } 3049 attempted.add(idx); 3050 } 3051 attempts++; 3052 } while (maxAttempts == -1 || attempts < maxAttempts); 3053 return null; 3054 } 3055 3056 public MiniDFSCluster getDFSCluster() { 3057 return dfsCluster; 3058 } 3059 3060 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 3061 setDFSCluster(cluster, true); 3062 } 3063 3064 /** 3065 * Set the MiniDFSCluster 3066 * @param cluster cluster to use 3067 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 3068 * is set. 3069 * @throws IllegalStateException if the passed cluster is up when it is required to be down 3070 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 3071 */ 3072 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 3073 throws IllegalStateException, IOException { 3074 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 3075 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 3076 } 3077 this.dfsCluster = cluster; 3078 this.setFs(); 3079 } 3080 3081 public FileSystem getTestFileSystem() throws IOException { 3082 return HFileSystem.get(conf); 3083 } 3084 3085 /** 3086 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 3087 * (30 seconds). 3088 * @param table Table to wait on. 3089 */ 3090 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 3091 waitTableAvailable(table.getName(), 30000); 3092 } 3093 3094 public void waitTableAvailable(TableName table, long timeoutMillis) 3095 throws InterruptedException, IOException { 3096 waitFor(timeoutMillis, predicateTableAvailable(table)); 3097 } 3098 3099 /** 3100 * Wait until all regions in a table have been assigned 3101 * @param table Table to wait on. 3102 * @param timeoutMillis Timeout. 3103 */ 3104 public void waitTableAvailable(byte[] table, long timeoutMillis) 3105 throws InterruptedException, IOException { 3106 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 3107 } 3108 3109 public String explainTableAvailability(TableName tableName) throws IOException { 3110 StringBuilder msg = 3111 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 3112 if (getHBaseCluster().getMaster().isAlive()) { 3113 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 3114 .getRegionStates().getRegionAssignments(); 3115 final List<Pair<RegionInfo, ServerName>> metaLocations = 3116 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 3117 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 3118 RegionInfo hri = metaLocation.getFirst(); 3119 ServerName sn = metaLocation.getSecond(); 3120 if (!assignments.containsKey(hri)) { 3121 msg.append(", region ").append(hri) 3122 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 3123 } else if (sn == null) { 3124 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 3125 } else if (!sn.equals(assignments.get(hri))) { 3126 msg.append(", region ").append(hri) 3127 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 3128 .append(" <> ").append(assignments.get(hri)); 3129 } 3130 } 3131 } 3132 return msg.toString(); 3133 } 3134 3135 public String explainTableState(final TableName table, TableState.State state) 3136 throws IOException { 3137 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 3138 if (tableState == null) { 3139 return "TableState in META: No table state in META for table " + table 3140 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 3141 } else if (!tableState.inStates(state)) { 3142 return "TableState in META: Not " + state + " state, but " + tableState; 3143 } else { 3144 return "TableState in META: OK"; 3145 } 3146 } 3147 3148 public TableState findLastTableState(final TableName table) throws IOException { 3149 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 3150 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 3151 @Override 3152 public boolean visit(Result r) throws IOException { 3153 if (!Arrays.equals(r.getRow(), table.getName())) { 3154 return false; 3155 } 3156 TableState state = CatalogFamilyFormat.getTableState(r); 3157 if (state != null) { 3158 lastTableState.set(state); 3159 } 3160 return true; 3161 } 3162 }; 3163 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 3164 Integer.MAX_VALUE, visitor); 3165 return lastTableState.get(); 3166 } 3167 3168 /** 3169 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 3170 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 3171 * table. 3172 * @param table the table to wait on. 3173 * @throws InterruptedException if interrupted while waiting 3174 * @throws IOException if an IO problem is encountered 3175 */ 3176 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 3177 waitTableEnabled(table, 30000); 3178 } 3179 3180 /** 3181 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 3182 * have been all assigned. 3183 * @see #waitTableEnabled(TableName, long) 3184 * @param table Table to wait on. 3185 * @param timeoutMillis Time to wait on it being marked enabled. 3186 */ 3187 public void waitTableEnabled(byte[] table, long timeoutMillis) 3188 throws InterruptedException, IOException { 3189 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 3190 } 3191 3192 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 3193 waitFor(timeoutMillis, predicateTableEnabled(table)); 3194 } 3195 3196 /** 3197 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 3198 * after default period (30 seconds) 3199 * @param table Table to wait on. 3200 */ 3201 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 3202 waitTableDisabled(table, 30000); 3203 } 3204 3205 public void waitTableDisabled(TableName table, long millisTimeout) 3206 throws InterruptedException, IOException { 3207 waitFor(millisTimeout, predicateTableDisabled(table)); 3208 } 3209 3210 /** 3211 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 3212 * @param table Table to wait on. 3213 * @param timeoutMillis Time to wait on it being marked disabled. 3214 */ 3215 public void waitTableDisabled(byte[] table, long timeoutMillis) 3216 throws InterruptedException, IOException { 3217 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 3218 } 3219 3220 /** 3221 * Make sure that at least the specified number of region servers are running 3222 * @param num minimum number of region servers that should be running 3223 * @return true if we started some servers 3224 */ 3225 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 3226 boolean startedServer = false; 3227 MiniHBaseCluster hbaseCluster = getMiniHBaseCluster(); 3228 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 3229 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 3230 startedServer = true; 3231 } 3232 3233 return startedServer; 3234 } 3235 3236 /** 3237 * Make sure that at least the specified number of region servers are running. We don't count the 3238 * ones that are currently stopping or are stopped. 3239 * @param num minimum number of region servers that should be running 3240 * @return true if we started some servers 3241 */ 3242 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 3243 boolean startedServer = ensureSomeRegionServersAvailable(num); 3244 3245 int nonStoppedServers = 0; 3246 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 3247 3248 HRegionServer hrs = rst.getRegionServer(); 3249 if (hrs.isStopping() || hrs.isStopped()) { 3250 LOG.info("A region server is stopped or stopping:" + hrs); 3251 } else { 3252 nonStoppedServers++; 3253 } 3254 } 3255 for (int i = nonStoppedServers; i < num; ++i) { 3256 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 3257 startedServer = true; 3258 } 3259 return startedServer; 3260 } 3261 3262 /** 3263 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 3264 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 3265 * @param c Initial configuration 3266 * @param differentiatingSuffix Suffix to differentiate this user from others. 3267 * @return A new configuration instance with a different user set into it. 3268 */ 3269 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 3270 throws IOException { 3271 FileSystem currentfs = FileSystem.get(c); 3272 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 3273 return User.getCurrent(); 3274 } 3275 // Else distributed filesystem. Make a new instance per daemon. Below 3276 // code is taken from the AppendTestUtil over in hdfs. 3277 String username = User.getCurrent().getName() + differentiatingSuffix; 3278 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 3279 return user; 3280 } 3281 3282 public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster) 3283 throws IOException { 3284 NavigableSet<String> online = new TreeSet<>(); 3285 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 3286 try { 3287 for (RegionInfo region : ProtobufUtil 3288 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 3289 online.add(region.getRegionNameAsString()); 3290 } 3291 } catch (RegionServerStoppedException e) { 3292 // That's fine. 3293 } 3294 } 3295 return online; 3296 } 3297 3298 /** 3299 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 3300 * linger. Here is the exception you'll see: 3301 * 3302 * <pre> 3303 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 3304 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 3305 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 3306 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 3307 * </pre> 3308 * 3309 * @param stream A DFSClient.DFSOutputStream. 3310 */ 3311 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 3312 try { 3313 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 3314 for (Class<?> clazz : clazzes) { 3315 String className = clazz.getSimpleName(); 3316 if (className.equals("DFSOutputStream")) { 3317 if (clazz.isInstance(stream)) { 3318 Field maxRecoveryErrorCountField = 3319 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 3320 maxRecoveryErrorCountField.setAccessible(true); 3321 maxRecoveryErrorCountField.setInt(stream, max); 3322 break; 3323 } 3324 } 3325 } 3326 } catch (Exception e) { 3327 LOG.info("Could not set max recovery field", e); 3328 } 3329 } 3330 3331 /** 3332 * Uses directly the assignment manager to assign the region. and waits until the specified region 3333 * has completed assignment. 3334 * @return true if the region is assigned false otherwise. 3335 */ 3336 public boolean assignRegion(final RegionInfo regionInfo) 3337 throws IOException, InterruptedException { 3338 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 3339 am.assign(regionInfo); 3340 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 3341 } 3342 3343 /** 3344 * Move region to destination server and wait till region is completely moved and online 3345 * @param destRegion region to move 3346 * @param destServer destination server of the region 3347 */ 3348 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3349 throws InterruptedException, IOException { 3350 HMaster master = getMiniHBaseCluster().getMaster(); 3351 // TODO: Here we start the move. The move can take a while. 3352 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3353 while (true) { 3354 ServerName serverName = 3355 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3356 if (serverName != null && serverName.equals(destServer)) { 3357 assertRegionOnServer(destRegion, serverName, 2000); 3358 break; 3359 } 3360 Thread.sleep(10); 3361 } 3362 } 3363 3364 /** 3365 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3366 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3367 * master has been informed and updated hbase:meta with the regions deployed server. 3368 * @param tableName the table name 3369 */ 3370 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3371 waitUntilAllRegionsAssigned(tableName, 3372 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3373 } 3374 3375 /** 3376 * Waith until all system table's regions get assigned 3377 */ 3378 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3379 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3380 } 3381 3382 /** 3383 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3384 * timeout. This means all regions have been deployed, master has been informed and updated 3385 * hbase:meta with the regions deployed server. 3386 * @param tableName the table name 3387 * @param timeout timeout, in milliseconds 3388 */ 3389 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3390 throws IOException { 3391 if (!TableName.isMetaTableName(tableName)) { 3392 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3393 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3394 + timeout + "ms"); 3395 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3396 @Override 3397 public String explainFailure() throws IOException { 3398 return explainTableAvailability(tableName); 3399 } 3400 3401 @Override 3402 public boolean evaluate() throws IOException { 3403 Scan scan = new Scan(); 3404 scan.addFamily(HConstants.CATALOG_FAMILY); 3405 boolean tableFound = false; 3406 try (ResultScanner s = meta.getScanner(scan)) { 3407 for (Result r; (r = s.next()) != null;) { 3408 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3409 RegionInfo info = RegionInfo.parseFromOrNull(b); 3410 if (info != null && info.getTable().equals(tableName)) { 3411 // Get server hosting this region from catalog family. Return false if no server 3412 // hosting this region, or if the server hosting this region was recently killed 3413 // (for fault tolerance testing). 3414 tableFound = true; 3415 byte[] server = 3416 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3417 if (server == null) { 3418 return false; 3419 } else { 3420 byte[] startCode = 3421 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3422 ServerName serverName = 3423 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3424 + Bytes.toLong(startCode)); 3425 if ( 3426 !getHBaseClusterInterface().isDistributedCluster() 3427 && getHBaseCluster().isKilledRS(serverName) 3428 ) { 3429 return false; 3430 } 3431 } 3432 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3433 return false; 3434 } 3435 } 3436 } 3437 } 3438 if (!tableFound) { 3439 LOG.warn( 3440 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3441 } 3442 return tableFound; 3443 } 3444 }); 3445 } 3446 } 3447 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3448 // check from the master state if we are using a mini cluster 3449 if (!getHBaseClusterInterface().isDistributedCluster()) { 3450 // So, all regions are in the meta table but make sure master knows of the assignments before 3451 // returning -- sometimes this can lag. 3452 HMaster master = getHBaseCluster().getMaster(); 3453 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3454 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3455 @Override 3456 public String explainFailure() throws IOException { 3457 return explainTableAvailability(tableName); 3458 } 3459 3460 @Override 3461 public boolean evaluate() throws IOException { 3462 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3463 return hris != null && !hris.isEmpty(); 3464 } 3465 }); 3466 } 3467 LOG.info("All regions for table " + tableName + " assigned."); 3468 } 3469 3470 /** 3471 * Do a small get/scan against one store. This is required because store has no actual methods of 3472 * querying itself, and relies on StoreScanner. 3473 */ 3474 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3475 Scan scan = new Scan(get); 3476 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3477 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3478 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3479 // readpoint 0. 3480 0); 3481 3482 List<Cell> result = new ArrayList<>(); 3483 scanner.next(result); 3484 if (!result.isEmpty()) { 3485 // verify that we are on the row we want: 3486 Cell kv = result.get(0); 3487 if (!CellUtil.matchingRows(kv, get.getRow())) { 3488 result.clear(); 3489 } 3490 } 3491 scanner.close(); 3492 return result; 3493 } 3494 3495 /** 3496 * Create region split keys between startkey and endKey 3497 * @param numRegions the number of regions to be created. it has to be greater than 3. 3498 * @return resulting split keys 3499 */ 3500 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3501 if (numRegions <= 3) { 3502 throw new AssertionError(); 3503 } 3504 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3505 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3506 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3507 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3508 return result; 3509 } 3510 3511 /** 3512 * Do a small get/scan against one store. This is required because store has no actual methods of 3513 * querying itself, and relies on StoreScanner. 3514 */ 3515 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3516 throws IOException { 3517 Get get = new Get(row); 3518 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3519 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3520 3521 return getFromStoreFile(store, get); 3522 } 3523 3524 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3525 final List<? extends Cell> actual) { 3526 final int eLen = expected.size(); 3527 final int aLen = actual.size(); 3528 final int minLen = Math.min(eLen, aLen); 3529 3530 int i; 3531 for (i = 0; i < minLen 3532 && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0; ++i) { 3533 } 3534 3535 if (additionalMsg == null) { 3536 additionalMsg = ""; 3537 } 3538 if (!additionalMsg.isEmpty()) { 3539 additionalMsg = ". " + additionalMsg; 3540 } 3541 3542 if (eLen != aLen || i != minLen) { 3543 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3544 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3545 + " (length " + aLen + ")" + additionalMsg); 3546 } 3547 } 3548 3549 public static <T> String safeGetAsStr(List<T> lst, int i) { 3550 if (0 <= i && i < lst.size()) { 3551 return lst.get(i).toString(); 3552 } else { 3553 return "<out_of_range>"; 3554 } 3555 } 3556 3557 public String getRpcConnnectionURI() throws UnknownHostException { 3558 return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); 3559 } 3560 3561 public String getZkConnectionURI() { 3562 return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 3563 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3564 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3565 } 3566 3567 /** 3568 * Get the zk based cluster key for this cluster. 3569 * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the 3570 * connection info of a cluster. Keep here only for compatibility. 3571 * @see #getRpcConnnectionURI() 3572 * @see #getZkConnectionURI() 3573 */ 3574 @Deprecated 3575 public String getClusterKey() { 3576 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3577 + ":" 3578 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3579 } 3580 3581 /** Creates a random table with the given parameters */ 3582 public Table createRandomTable(TableName tableName, final Collection<String> families, 3583 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3584 final int numRowsPerFlush) throws IOException, InterruptedException { 3585 3586 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3587 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3588 + maxVersions + "\n"); 3589 3590 final int numCF = families.size(); 3591 final byte[][] cfBytes = new byte[numCF][]; 3592 { 3593 int cfIndex = 0; 3594 for (String cf : families) { 3595 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3596 } 3597 } 3598 3599 final int actualStartKey = 0; 3600 final int actualEndKey = Integer.MAX_VALUE; 3601 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3602 final int splitStartKey = actualStartKey + keysPerRegion; 3603 final int splitEndKey = actualEndKey - keysPerRegion; 3604 final String keyFormat = "%08x"; 3605 final Table table = createTable(tableName, cfBytes, maxVersions, 3606 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3607 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3608 3609 if (hbaseCluster != null) { 3610 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3611 } 3612 3613 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3614 3615 final Random rand = ThreadLocalRandom.current(); 3616 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3617 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3618 final byte[] row = Bytes.toBytes( 3619 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3620 3621 Put put = new Put(row); 3622 Delete del = new Delete(row); 3623 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3624 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3625 final long ts = rand.nextInt(); 3626 final byte[] qual = Bytes.toBytes("col" + iCol); 3627 if (rand.nextBoolean()) { 3628 final byte[] value = 3629 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3630 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3631 put.addColumn(cf, qual, ts, value); 3632 } else if (rand.nextDouble() < 0.8) { 3633 del.addColumn(cf, qual, ts); 3634 } else { 3635 del.addColumns(cf, qual, ts); 3636 } 3637 } 3638 3639 if (!put.isEmpty()) { 3640 mutator.mutate(put); 3641 } 3642 3643 if (!del.isEmpty()) { 3644 mutator.mutate(del); 3645 } 3646 } 3647 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3648 mutator.flush(); 3649 if (hbaseCluster != null) { 3650 getMiniHBaseCluster().flushcache(table.getName()); 3651 } 3652 } 3653 mutator.close(); 3654 3655 return table; 3656 } 3657 3658 public static int randomFreePort() { 3659 return HBaseCommonTestingUtility.randomFreePort(); 3660 } 3661 3662 public static String randomMultiCastAddress() { 3663 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3664 } 3665 3666 public static void waitForHostPort(String host, int port) throws IOException { 3667 final int maxTimeMs = 10000; 3668 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3669 IOException savedException = null; 3670 LOG.info("Waiting for server at " + host + ":" + port); 3671 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3672 try { 3673 Socket sock = new Socket(InetAddress.getByName(host), port); 3674 sock.close(); 3675 savedException = null; 3676 LOG.info("Server at " + host + ":" + port + " is available"); 3677 break; 3678 } catch (UnknownHostException e) { 3679 throw new IOException("Failed to look up " + host, e); 3680 } catch (IOException e) { 3681 savedException = e; 3682 } 3683 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3684 } 3685 3686 if (savedException != null) { 3687 throw savedException; 3688 } 3689 } 3690 3691 /** 3692 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3693 * continues. 3694 * @return the number of regions the table was split into 3695 */ 3696 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3697 byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding) 3698 throws IOException { 3699 return createPreSplitLoadTestTable(conf, tableName, columnFamily, compression, 3700 dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, Durability.USE_DEFAULT); 3701 } 3702 3703 /** 3704 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3705 * continues. 3706 * @return the number of regions the table was split into 3707 */ 3708 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3709 byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding, 3710 int numRegionsPerServer, int regionReplication, Durability durability) throws IOException { 3711 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 3712 builder.setDurability(durability); 3713 builder.setRegionReplication(regionReplication); 3714 ColumnFamilyDescriptorBuilder cfBuilder = 3715 ColumnFamilyDescriptorBuilder.newBuilder(columnFamily); 3716 cfBuilder.setDataBlockEncoding(dataBlockEncoding); 3717 cfBuilder.setCompressionType(compression); 3718 return createPreSplitLoadTestTable(conf, builder.build(), cfBuilder.build(), 3719 numRegionsPerServer); 3720 } 3721 3722 /** 3723 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3724 * continues. 3725 * @return the number of regions the table was split into 3726 */ 3727 public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, 3728 byte[][] columnFamilies, Algorithm compression, DataBlockEncoding dataBlockEncoding, 3729 int numRegionsPerServer, int regionReplication, Durability durability) throws IOException { 3730 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 3731 builder.setDurability(durability); 3732 builder.setRegionReplication(regionReplication); 3733 ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length]; 3734 for (int i = 0; i < columnFamilies.length; i++) { 3735 ColumnFamilyDescriptorBuilder cfBuilder = 3736 ColumnFamilyDescriptorBuilder.newBuilder(columnFamilies[i]); 3737 cfBuilder.setDataBlockEncoding(dataBlockEncoding); 3738 cfBuilder.setCompressionType(compression); 3739 hcds[i] = cfBuilder.build(); 3740 } 3741 return createPreSplitLoadTestTable(conf, builder.build(), hcds, numRegionsPerServer); 3742 } 3743 3744 /** 3745 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3746 * continues. 3747 * @return the number of regions the table was split into 3748 */ 3749 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3750 ColumnFamilyDescriptor hcd) throws IOException { 3751 return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER); 3752 } 3753 3754 /** 3755 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3756 * continues. 3757 * @return the number of regions the table was split into 3758 */ 3759 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3760 ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException { 3761 return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] { hcd }, 3762 numRegionsPerServer); 3763 } 3764 3765 /** 3766 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3767 * continues. 3768 * @return the number of regions the table was split into 3769 */ 3770 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor desc, 3771 ColumnFamilyDescriptor[] hcds, int numRegionsPerServer) throws IOException { 3772 return createPreSplitLoadTestTable(conf, desc, hcds, new RegionSplitter.HexStringSplit(), 3773 numRegionsPerServer); 3774 } 3775 3776 /** 3777 * Creates a pre-split table for load testing. If the table already exists, logs a warning and 3778 * continues. 3779 * @return the number of regions the table was split into 3780 */ 3781 public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td, 3782 ColumnFamilyDescriptor[] cds, SplitAlgorithm splitter, int numRegionsPerServer) 3783 throws IOException { 3784 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td); 3785 for (ColumnFamilyDescriptor cd : cds) { 3786 if (!td.hasColumnFamily(cd.getName())) { 3787 builder.setColumnFamily(cd); 3788 } 3789 } 3790 td = builder.build(); 3791 int totalNumberOfRegions = 0; 3792 Connection unmanagedConnection = ConnectionFactory.createConnection(conf); 3793 Admin admin = unmanagedConnection.getAdmin(); 3794 3795 try { 3796 // create a table a pre-splits regions. 3797 // The number of splits is set as: 3798 // region servers * regions per region server). 3799 int numberOfServers = admin.getRegionServers().size(); 3800 if (numberOfServers == 0) { 3801 throw new IllegalStateException("No live regionservers"); 3802 } 3803 3804 totalNumberOfRegions = numberOfServers * numRegionsPerServer; 3805 LOG.info("Number of live regionservers: " + numberOfServers + ", " 3806 + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: " 3807 + numRegionsPerServer + ")"); 3808 3809 byte[][] splits = splitter.split(totalNumberOfRegions); 3810 3811 admin.createTable(td, splits); 3812 } catch (MasterNotRunningException e) { 3813 LOG.error("Master not running", e); 3814 throw new IOException(e); 3815 } catch (TableExistsException e) { 3816 LOG.warn("Table " + td.getTableName() + " already exists, continuing"); 3817 } finally { 3818 admin.close(); 3819 unmanagedConnection.close(); 3820 } 3821 return totalNumberOfRegions; 3822 } 3823 3824 public static int getMetaRSPort(Connection connection) throws IOException { 3825 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3826 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3827 } 3828 } 3829 3830 /** 3831 * Due to async racing issue, a region may not be in the online region list of a region server 3832 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3833 */ 3834 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3835 final long timeout) throws IOException, InterruptedException { 3836 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3837 while (true) { 3838 List<RegionInfo> regions = getAdmin().getRegions(server); 3839 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3840 long now = EnvironmentEdgeManager.currentTime(); 3841 if (now > timeoutTime) break; 3842 Thread.sleep(10); 3843 } 3844 throw new AssertionError( 3845 "Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3846 } 3847 3848 /** 3849 * Check to make sure the region is open on the specified region server, but not on any other one. 3850 */ 3851 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3852 final long timeout) throws IOException, InterruptedException { 3853 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3854 while (true) { 3855 List<RegionInfo> regions = getAdmin().getRegions(server); 3856 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3857 List<JVMClusterUtil.RegionServerThread> rsThreads = 3858 getHBaseCluster().getLiveRegionServerThreads(); 3859 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3860 HRegionServer rs = rsThread.getRegionServer(); 3861 if (server.equals(rs.getServerName())) { 3862 continue; 3863 } 3864 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3865 for (HRegion r : hrs) { 3866 if (r.getRegionInfo().getRegionId() == hri.getRegionId()) { 3867 throw new AssertionError("Region should not be double assigned"); 3868 } 3869 } 3870 } 3871 return; // good, we are happy 3872 } 3873 long now = EnvironmentEdgeManager.currentTime(); 3874 if (now > timeoutTime) break; 3875 Thread.sleep(10); 3876 } 3877 throw new AssertionError( 3878 "Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3879 } 3880 3881 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3882 TableDescriptor td = 3883 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3884 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3885 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3886 } 3887 3888 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3889 BlockCache blockCache) throws IOException { 3890 TableDescriptor td = 3891 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3892 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3893 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3894 } 3895 3896 public static void setFileSystemURI(String fsURI) { 3897 FS_URI = fsURI; 3898 } 3899 3900 /** 3901 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3902 */ 3903 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3904 return new ExplainingPredicate<IOException>() { 3905 @Override 3906 public String explainFailure() throws IOException { 3907 final AssignmentManager am = getMiniHBaseCluster().getMaster().getAssignmentManager(); 3908 return "found in transition: " + am.getRegionsInTransition().toString(); 3909 } 3910 3911 @Override 3912 public boolean evaluate() throws IOException { 3913 HMaster master = getMiniHBaseCluster().getMaster(); 3914 if (master == null) return false; 3915 AssignmentManager am = master.getAssignmentManager(); 3916 if (am == null) return false; 3917 return !am.hasRegionsInTransition(); 3918 } 3919 }; 3920 } 3921 3922 /** 3923 * Returns a {@link Predicate} for checking that table is enabled 3924 */ 3925 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3926 return new ExplainingPredicate<IOException>() { 3927 @Override 3928 public String explainFailure() throws IOException { 3929 return explainTableState(tableName, TableState.State.ENABLED); 3930 } 3931 3932 @Override 3933 public boolean evaluate() throws IOException { 3934 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3935 } 3936 }; 3937 } 3938 3939 /** 3940 * Returns a {@link Predicate} for checking that table is enabled 3941 */ 3942 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3943 return new ExplainingPredicate<IOException>() { 3944 @Override 3945 public String explainFailure() throws IOException { 3946 return explainTableState(tableName, TableState.State.DISABLED); 3947 } 3948 3949 @Override 3950 public boolean evaluate() throws IOException { 3951 return getAdmin().isTableDisabled(tableName); 3952 } 3953 }; 3954 } 3955 3956 /** 3957 * Returns a {@link Predicate} for checking that table is enabled 3958 */ 3959 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3960 return new ExplainingPredicate<IOException>() { 3961 @Override 3962 public String explainFailure() throws IOException { 3963 return explainTableAvailability(tableName); 3964 } 3965 3966 @Override 3967 public boolean evaluate() throws IOException { 3968 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3969 if (tableAvailable) { 3970 try (Table table = getConnection().getTable(tableName)) { 3971 TableDescriptor htd = table.getDescriptor(); 3972 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3973 .getAllRegionLocations()) { 3974 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3975 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3976 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3977 for (byte[] family : htd.getColumnFamilyNames()) { 3978 scan.addFamily(family); 3979 } 3980 try (ResultScanner scanner = table.getScanner(scan)) { 3981 scanner.next(); 3982 } 3983 } 3984 } 3985 } 3986 return tableAvailable; 3987 } 3988 }; 3989 } 3990 3991 /** 3992 * Wait until no regions in transition. 3993 * @param timeout How long to wait. 3994 */ 3995 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3996 waitFor(timeout, predicateNoRegionsInTransition()); 3997 } 3998 3999 /** 4000 * Wait until no regions in transition. (time limit 15min) 4001 */ 4002 public void waitUntilNoRegionsInTransition() throws IOException { 4003 waitUntilNoRegionsInTransition(15 * 60000); 4004 } 4005 4006 /** 4007 * Wait until labels is ready in VisibilityLabelsCache. 4008 */ 4009 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 4010 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 4011 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 4012 4013 @Override 4014 public boolean evaluate() { 4015 for (String label : labels) { 4016 if (labelsCache.getLabelOrdinal(label) == 0) { 4017 return false; 4018 } 4019 } 4020 return true; 4021 } 4022 4023 @Override 4024 public String explainFailure() { 4025 for (String label : labels) { 4026 if (labelsCache.getLabelOrdinal(label) == 0) { 4027 return label + " is not available yet"; 4028 } 4029 } 4030 return ""; 4031 } 4032 }); 4033 } 4034 4035 /** 4036 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 4037 * available. 4038 * @return the list of column descriptors 4039 */ 4040 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 4041 return generateColumnDescriptors(""); 4042 } 4043 4044 /** 4045 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 4046 * available. 4047 * @param prefix family names prefix 4048 * @return the list of column descriptors 4049 */ 4050 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 4051 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 4052 long familyId = 0; 4053 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 4054 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 4055 for (BloomType bloomType : BloomType.values()) { 4056 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 4057 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 4058 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 4059 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 4060 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 4061 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 4062 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 4063 familyId++; 4064 } 4065 } 4066 } 4067 return columnFamilyDescriptors; 4068 } 4069 4070 /** 4071 * Get supported compression algorithms. 4072 * @return supported compression algorithms. 4073 */ 4074 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 4075 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 4076 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 4077 for (String algoName : allAlgos) { 4078 try { 4079 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 4080 algo.getCompressor(); 4081 supportedAlgos.add(algo); 4082 } catch (Throwable t) { 4083 // this algo is not available 4084 } 4085 } 4086 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 4087 } 4088 4089 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 4090 Scan scan = new Scan().withStartRow(row); 4091 scan.setReadType(ReadType.PREAD); 4092 scan.setCaching(1); 4093 scan.setReversed(true); 4094 scan.addFamily(family); 4095 try (RegionScanner scanner = r.getScanner(scan)) { 4096 List<Cell> cells = new ArrayList<>(1); 4097 scanner.next(cells); 4098 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 4099 return null; 4100 } 4101 return Result.create(cells); 4102 } 4103 } 4104 4105 private boolean isTargetTable(final byte[] inRow, Cell c) { 4106 String inputRowString = Bytes.toString(inRow); 4107 int i = inputRowString.indexOf(HConstants.DELIMITER); 4108 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 4109 int o = outputRowString.indexOf(HConstants.DELIMITER); 4110 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 4111 } 4112 4113 /** 4114 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 4115 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 4116 * kerby KDC server and utility for using it, 4117 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 4118 * less baggage. It came in in HBASE-5291. 4119 */ 4120 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 4121 Properties conf = MiniKdc.createConf(); 4122 conf.put(MiniKdc.DEBUG, true); 4123 MiniKdc kdc = null; 4124 File dir = null; 4125 // There is time lag between selecting a port and trying to bind with it. It's possible that 4126 // another service captures the port in between which'll result in BindException. 4127 boolean bindException; 4128 int numTries = 0; 4129 do { 4130 try { 4131 bindException = false; 4132 dir = new File(getDataTestDir("kdc").toUri().getPath()); 4133 kdc = new MiniKdc(conf, dir); 4134 kdc.start(); 4135 } catch (BindException e) { 4136 FileUtils.deleteDirectory(dir); // clean directory 4137 numTries++; 4138 if (numTries == 3) { 4139 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 4140 throw e; 4141 } 4142 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 4143 bindException = true; 4144 } 4145 } while (bindException); 4146 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 4147 return kdc; 4148 } 4149 4150 public int getNumHFiles(final TableName tableName, final byte[] family) { 4151 int numHFiles = 0; 4152 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 4153 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 4154 } 4155 return numHFiles; 4156 } 4157 4158 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 4159 final byte[] family) { 4160 int numHFiles = 0; 4161 for (Region region : rs.getRegions(tableName)) { 4162 numHFiles += region.getStore(family).getStorefilesCount(); 4163 } 4164 return numHFiles; 4165 } 4166 4167 private void assertEquals(String message, int expected, int actual) { 4168 if (expected == actual) { 4169 return; 4170 } 4171 String formatted = ""; 4172 if (message != null && !"".equals(message)) { 4173 formatted = message + " "; 4174 } 4175 throw new AssertionError(formatted + "expected:<" + expected + "> but was:<" + actual + ">"); 4176 } 4177 4178 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 4179 if (ltd.getValues().hashCode() != rtd.getValues().hashCode()) { 4180 throw new AssertionError(); 4181 } 4182 assertEquals("", ltd.getValues().hashCode(), rtd.getValues().hashCode()); 4183 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 4184 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 4185 assertEquals("", ltdFamilies.size(), rtdFamilies.size()); 4186 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 4187 it2 = rtdFamilies.iterator(); it.hasNext();) { 4188 assertEquals("", 0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 4189 } 4190 } 4191 4192 /** 4193 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 4194 * invocations. 4195 */ 4196 public static void await(final long sleepMillis, final BooleanSupplier condition) 4197 throws InterruptedException { 4198 try { 4199 while (!condition.getAsBoolean()) { 4200 Thread.sleep(sleepMillis); 4201 } 4202 } catch (RuntimeException e) { 4203 if (e.getCause() instanceof AssertionError) { 4204 throw (AssertionError) e.getCause(); 4205 } 4206 throw e; 4207 } 4208 } 4209}