001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedAction; 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.hbase.client.RegionInfoBuilder; 029import org.apache.hadoop.hbase.client.RegionReplicaUtil; 030import org.apache.hadoop.hbase.master.HMaster; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.Region; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.test.MetricsAssertHelper; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.JVMClusterUtil; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.apache.hadoop.hbase.util.Threads; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 047 048/** 049 * This class creates a single process HBase cluster. each server. The master uses the 'default' 050 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem 051 * instance each and will close down their instance on the way out. 052 * @deprecated since 3.0.0, will be removed in 4.0.0. Use 053 * {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead. 054 */ 055@InterfaceAudience.Public 056@Deprecated 057public class MiniHBaseCluster extends HBaseCluster { 058 private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName()); 059 public LocalHBaseCluster hbaseCluster; 060 private static int index; 061 062 /** 063 * Start a MiniHBaseCluster. 064 * @param conf Configuration to be used for cluster 065 * @param numRegionServers initial number of region servers to start. 066 */ 067 public MiniHBaseCluster(Configuration conf, int numRegionServers) 068 throws IOException, InterruptedException { 069 this(conf, 1, numRegionServers); 070 } 071 072 /** 073 * Start a MiniHBaseCluster. 074 * @param conf Configuration to be used for cluster 075 * @param numMasters initial number of masters to start. 076 * @param numRegionServers initial number of region servers to start. 077 */ 078 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) 079 throws IOException, InterruptedException { 080 this(conf, numMasters, numRegionServers, null, null); 081 } 082 083 /** 084 * Start a MiniHBaseCluster. 085 * @param conf Configuration to be used for cluster 086 * @param numMasters initial number of masters to start. 087 * @param numRegionServers initial number of region servers to start. 088 */ 089 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 090 Class<? extends HMaster> masterClass, 091 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 092 throws IOException, InterruptedException { 093 this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass); 094 } 095 096 /** 097 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster 098 * restart where for sure the regionservers come up on same address+port (but just 099 * with different startcode); by default mini hbase clusters choose new arbitrary 100 * ports on each cluster start. 101 */ 102 public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters, 103 int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 104 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 105 throws IOException, InterruptedException { 106 super(conf); 107 108 // Hadoop 2 109 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); 110 111 init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass, 112 regionserverClass); 113 this.initialClusterStatus = getClusterMetrics(); 114 } 115 116 public Configuration getConfiguration() { 117 return this.conf; 118 } 119 120 /** 121 * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance 122 * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem 123 * only, not All filesystems as the FileSystem system exit hook does. 124 */ 125 public static class MiniHBaseClusterRegionServer extends HRegionServer { 126 private Thread shutdownThread = null; 127 private User user = null; 128 /** 129 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any 130 * restarted instances of the same server will have different ServerName and will not coincide 131 * with past dead ones. So there's no need to cleanup this list. 132 */ 133 static Set<ServerName> killedServers = new HashSet<>(); 134 135 public MiniHBaseClusterRegionServer(Configuration conf) 136 throws IOException, InterruptedException { 137 super(conf); 138 this.user = User.getCurrent(); 139 } 140 141 /* 142 * @param currentfs We return this if we did not make a new one. 143 * @param uniqueName Same name used to help identify the created fs. 144 * @return A new fs instance if we are up on DistributeFileSystem. 145 */ 146 147 @Override 148 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 149 throws IOException { 150 super.handleReportForDutyResponse(c); 151 // Run this thread to shutdown our filesystem on way out. 152 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem()); 153 } 154 155 @Override 156 public void run() { 157 try { 158 this.user.runAs(new PrivilegedAction<Object>() { 159 @Override 160 public Object run() { 161 runRegionServer(); 162 return null; 163 } 164 }); 165 } catch (Throwable t) { 166 LOG.error("Exception in run", t); 167 } finally { 168 // Run this on the way out. 169 if (this.shutdownThread != null) { 170 this.shutdownThread.start(); 171 Threads.shutdown(this.shutdownThread, 30000); 172 } 173 } 174 } 175 176 private void runRegionServer() { 177 super.run(); 178 } 179 180 @Override 181 protected void kill() { 182 killedServers.add(getServerName()); 183 super.kill(); 184 } 185 186 @Override 187 public void abort(final String reason, final Throwable cause) { 188 this.user.runAs(new PrivilegedAction<Object>() { 189 @Override 190 public Object run() { 191 abortRegionServer(reason, cause); 192 return null; 193 } 194 }); 195 } 196 197 private void abortRegionServer(String reason, Throwable cause) { 198 super.abort(reason, cause); 199 } 200 } 201 202 /** 203 * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook 204 * does. 205 */ 206 static class SingleFileSystemShutdownThread extends Thread { 207 private final FileSystem fs; 208 209 SingleFileSystemShutdownThread(final FileSystem fs) { 210 super("Shutdown of " + fs); 211 this.fs = fs; 212 } 213 214 @Override 215 public void run() { 216 try { 217 LOG.info("Hook closing fs=" + this.fs); 218 this.fs.close(); 219 } catch (IOException e) { 220 LOG.warn("Running hook", e); 221 } 222 } 223 } 224 225 private void init(final int nMasterNodes, final int numAlwaysStandByMasters, 226 final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 227 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 228 throws IOException, InterruptedException { 229 try { 230 if (masterClass == null) { 231 masterClass = HMaster.class; 232 } 233 if (regionserverClass == null) { 234 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class; 235 } 236 237 // start up a LocalHBaseCluster 238 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0, 239 masterClass, regionserverClass); 240 241 // manually add the regionservers as other users 242 for (int i = 0; i < nRegionNodes; i++) { 243 Configuration rsConf = HBaseConfiguration.create(conf); 244 if (rsPorts != null) { 245 rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); 246 } 247 User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++); 248 hbaseCluster.addRegionServer(rsConf, i, user); 249 } 250 251 hbaseCluster.startup(); 252 } catch (IOException e) { 253 shutdown(); 254 throw e; 255 } catch (Throwable t) { 256 LOG.error("Error starting cluster", t); 257 shutdown(); 258 throw new IOException("Shutting down", t); 259 } 260 } 261 262 @Override 263 public void startRegionServer(String hostname, int port) throws IOException { 264 final Configuration newConf = HBaseConfiguration.create(conf); 265 newConf.setInt(HConstants.REGIONSERVER_PORT, port); 266 startRegionServer(newConf); 267 } 268 269 @Override 270 public void killRegionServer(ServerName serverName) throws IOException { 271 HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); 272 if (server instanceof MiniHBaseClusterRegionServer) { 273 LOG.info("Killing " + server.toString()); 274 ((MiniHBaseClusterRegionServer) server).kill(); 275 } else { 276 abortRegionServer(getRegionServerIndex(serverName)); 277 } 278 } 279 280 @Override 281 public boolean isKilledRS(ServerName serverName) { 282 return MiniHBaseClusterRegionServer.killedServers.contains(serverName); 283 } 284 285 @Override 286 public void stopRegionServer(ServerName serverName) throws IOException { 287 stopRegionServer(getRegionServerIndex(serverName)); 288 } 289 290 @Override 291 public void suspendRegionServer(ServerName serverName) throws IOException { 292 suspendRegionServer(getRegionServerIndex(serverName)); 293 } 294 295 @Override 296 public void resumeRegionServer(ServerName serverName) throws IOException { 297 resumeRegionServer(getRegionServerIndex(serverName)); 298 } 299 300 @Override 301 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 302 // ignore timeout for now 303 waitOnRegionServer(getRegionServerIndex(serverName)); 304 } 305 306 @Override 307 public void startZkNode(String hostname, int port) throws IOException { 308 LOG.warn("Starting zookeeper nodes on mini cluster is not supported"); 309 } 310 311 @Override 312 public void killZkNode(ServerName serverName) throws IOException { 313 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported"); 314 } 315 316 @Override 317 public void stopZkNode(ServerName serverName) throws IOException { 318 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported"); 319 } 320 321 @Override 322 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 323 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported"); 324 } 325 326 @Override 327 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 328 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported"); 329 } 330 331 @Override 332 public void startDataNode(ServerName serverName) throws IOException { 333 LOG.warn("Starting datanodes on mini cluster is not supported"); 334 } 335 336 @Override 337 public void killDataNode(ServerName serverName) throws IOException { 338 LOG.warn("Aborting datanodes on mini cluster is not supported"); 339 } 340 341 @Override 342 public void stopDataNode(ServerName serverName) throws IOException { 343 LOG.warn("Stopping datanodes on mini cluster is not supported"); 344 } 345 346 @Override 347 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 348 LOG.warn("Waiting for datanodes to start on mini cluster is not supported"); 349 } 350 351 @Override 352 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 353 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported"); 354 } 355 356 @Override 357 public void startNameNode(ServerName serverName) throws IOException { 358 LOG.warn("Starting namenodes on mini cluster is not supported"); 359 } 360 361 @Override 362 public void killNameNode(ServerName serverName) throws IOException { 363 LOG.warn("Aborting namenodes on mini cluster is not supported"); 364 } 365 366 @Override 367 public void stopNameNode(ServerName serverName) throws IOException { 368 LOG.warn("Stopping namenodes on mini cluster is not supported"); 369 } 370 371 @Override 372 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 373 LOG.warn("Waiting for namenodes to start on mini cluster is not supported"); 374 } 375 376 @Override 377 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 378 LOG.warn("Waiting for namenodes to stop on mini cluster is not supported"); 379 } 380 381 @Override 382 public void startMaster(String hostname, int port) throws IOException { 383 this.startMaster(); 384 } 385 386 @Override 387 public void killMaster(ServerName serverName) throws IOException { 388 abortMaster(getMasterIndex(serverName)); 389 } 390 391 @Override 392 public void stopMaster(ServerName serverName) throws IOException { 393 stopMaster(getMasterIndex(serverName)); 394 } 395 396 @Override 397 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 398 // ignore timeout for now 399 waitOnMaster(getMasterIndex(serverName)); 400 } 401 402 /** 403 * Starts a region server thread running 404 * @return New RegionServerThread 405 */ 406 public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException { 407 final Configuration newConf = HBaseConfiguration.create(conf); 408 return startRegionServer(newConf); 409 } 410 411 private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration) 412 throws IOException { 413 User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++); 414 JVMClusterUtil.RegionServerThread t = null; 415 try { 416 t = 417 hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser); 418 t.start(); 419 t.waitForServerOnline(); 420 } catch (InterruptedException ie) { 421 throw new IOException("Interrupted adding regionserver to cluster", ie); 422 } 423 return t; 424 } 425 426 /** 427 * Starts a region server thread and waits until its processed by master. Throws an exception when 428 * it can't start a region server or when the region server is not processed by master within the 429 * timeout. 430 * @return New RegionServerThread 431 */ 432 public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) 433 throws IOException { 434 435 JVMClusterUtil.RegionServerThread t = startRegionServer(); 436 ServerName rsServerName = t.getRegionServer().getServerName(); 437 438 long start = EnvironmentEdgeManager.currentTime(); 439 ClusterMetrics clusterStatus = getClusterMetrics(); 440 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 441 if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) { 442 return t; 443 } 444 Threads.sleep(100); 445 } 446 if (t.getRegionServer().isOnline()) { 447 throw new IOException("RS: " + rsServerName + " online, but not processed by master"); 448 } else { 449 throw new IOException("RS: " + rsServerName + " is offline"); 450 } 451 } 452 453 /** 454 * Cause a region server to exit doing basic clean up only on its way out. 455 * @param serverNumber Used as index into a list. 456 */ 457 public String abortRegionServer(int serverNumber) { 458 HRegionServer server = getRegionServer(serverNumber); 459 LOG.info("Aborting " + server.toString()); 460 server.abort("Aborting for tests", new Exception("Trace info")); 461 return server.toString(); 462 } 463 464 /** 465 * Shut down the specified region server cleanly 466 * @param serverNumber Used as index into a list. 467 * @return the region server that was stopped 468 */ 469 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { 470 return stopRegionServer(serverNumber, true); 471 } 472 473 /** 474 * Shut down the specified region server cleanly 475 * @param serverNumber Used as index into a list. 476 * @param shutdownFS True is we are to shutdown the filesystem as part of this regionserver's 477 * shutdown. Usually we do but you do not want to do this if you are running 478 * multiple regionservers in a test and you shut down one before end of the 479 * test. 480 * @return the region server that was stopped 481 */ 482 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, 483 final boolean shutdownFS) { 484 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 485 LOG.info("Stopping " + server.toString()); 486 server.getRegionServer().stop("Stopping rs " + serverNumber); 487 return server; 488 } 489 490 /** 491 * Suspend the specified region server 492 * @param serverNumber Used as index into a list. 493 */ 494 public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) { 495 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 496 LOG.info("Suspending {}", server.toString()); 497 server.suspend(); 498 return server; 499 } 500 501 /** 502 * Resume the specified region server 503 * @param serverNumber Used as index into a list. 504 */ 505 public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) { 506 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 507 LOG.info("Resuming {}", server.toString()); 508 server.resume(); 509 return server; 510 } 511 512 /** 513 * Wait for the specified region server to stop. Removes this thread from list of running threads. 514 * @return Name of region server that just went down. 515 */ 516 public String waitOnRegionServer(final int serverNumber) { 517 return this.hbaseCluster.waitOnRegionServer(serverNumber); 518 } 519 520 /** 521 * Starts a master thread running 522 * @return New RegionServerThread 523 */ 524 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 525 value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", 526 justification = "Testing only, not a big deal") 527 public JVMClusterUtil.MasterThread startMaster() throws IOException { 528 Configuration c = HBaseConfiguration.create(conf); 529 User user = HBaseTestingUtility.getDifferentUser(c, ".hfs." + index++); 530 531 JVMClusterUtil.MasterThread t = null; 532 try { 533 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user); 534 t.start(); 535 } catch (InterruptedException ie) { 536 throw new IOException("Interrupted adding master to cluster", ie); 537 } 538 conf.set(HConstants.MASTER_ADDRS_KEY, 539 hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY)); 540 return t; 541 } 542 543 /** 544 * Returns the current active master, if available. 545 * @return the active HMaster, null if none is active. 546 */ 547 public HMaster getMaster() { 548 return this.hbaseCluster.getActiveMaster(); 549 } 550 551 /** 552 * Returns the current active master thread, if available. 553 * @return the active MasterThread, null if none is active. 554 */ 555 public MasterThread getMasterThread() { 556 for (MasterThread mt : hbaseCluster.getLiveMasters()) { 557 if (mt.getMaster().isActiveMaster()) { 558 return mt; 559 } 560 } 561 return null; 562 } 563 564 /** 565 * Returns the master at the specified index, if available. 566 * @return the active HMaster, null if none is active. 567 */ 568 public HMaster getMaster(final int serverNumber) { 569 return this.hbaseCluster.getMaster(serverNumber); 570 } 571 572 /** 573 * Cause a master to exit without shutting down entire cluster. 574 * @param serverNumber Used as index into a list. 575 */ 576 public String abortMaster(int serverNumber) { 577 HMaster server = getMaster(serverNumber); 578 LOG.info("Aborting " + server.toString()); 579 server.abort("Aborting for tests", new Exception("Trace info")); 580 return server.toString(); 581 } 582 583 /** 584 * Shut down the specified master cleanly 585 * @param serverNumber Used as index into a list. 586 * @return the region server that was stopped 587 */ 588 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { 589 return stopMaster(serverNumber, true); 590 } 591 592 /** 593 * Shut down the specified master cleanly 594 * @param serverNumber Used as index into a list. 595 * @param shutdownFS True is we are to shutdown the filesystem as part of this master's 596 * shutdown. Usually we do but you do not want to do this if you are running 597 * multiple master in a test and you shut down one before end of the test. 598 * @return the master that was stopped 599 */ 600 public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) { 601 JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber); 602 LOG.info("Stopping " + server.toString()); 603 server.getMaster().stop("Stopping master " + serverNumber); 604 return server; 605 } 606 607 /** 608 * Wait for the specified master to stop. Removes this thread from list of running threads. 609 * @return Name of master that just went down. 610 */ 611 public String waitOnMaster(final int serverNumber) { 612 return this.hbaseCluster.waitOnMaster(serverNumber); 613 } 614 615 /** 616 * Blocks until there is an active master and that master has completed initialization. 617 * @return true if an active master becomes available. false if there are no masters left. 618 */ 619 @Override 620 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 621 List<JVMClusterUtil.MasterThread> mts; 622 long start = EnvironmentEdgeManager.currentTime(); 623 while ( 624 !(mts = getMasterThreads()).isEmpty() 625 && (EnvironmentEdgeManager.currentTime() - start) < timeout 626 ) { 627 for (JVMClusterUtil.MasterThread mt : mts) { 628 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { 629 return true; 630 } 631 } 632 633 Threads.sleep(100); 634 } 635 return false; 636 } 637 638 /** Returns List of master threads. */ 639 public List<JVMClusterUtil.MasterThread> getMasterThreads() { 640 return this.hbaseCluster.getMasters(); 641 } 642 643 /** Returns List of live master threads (skips the aborted and the killed) */ 644 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() { 645 return this.hbaseCluster.getLiveMasters(); 646 } 647 648 /** 649 * Wait for Mini HBase Cluster to shut down. 650 */ 651 public void join() { 652 this.hbaseCluster.join(); 653 } 654 655 /** 656 * Shut down the mini HBase cluster 657 */ 658 @Override 659 public void shutdown() throws IOException { 660 if (this.hbaseCluster != null) { 661 this.hbaseCluster.shutdown(); 662 } 663 } 664 665 @Override 666 public void close() throws IOException { 667 } 668 669 @Override 670 public ClusterMetrics getClusterMetrics() throws IOException { 671 HMaster master = getMaster(); 672 return master == null ? null : master.getClusterMetrics(); 673 } 674 675 private void executeFlush(HRegion region) throws IOException { 676 if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 677 return; 678 } 679 // retry 5 times if we can not flush 680 for (int i = 0; i < 5; i++) { 681 FlushResult result = region.flush(true); 682 if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) { 683 return; 684 } 685 Threads.sleep(1000); 686 } 687 } 688 689 /** 690 * Call flushCache on all regions on all participating regionservers. 691 */ 692 public void flushcache() throws IOException { 693 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 694 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 695 executeFlush(r); 696 } 697 } 698 } 699 700 /** 701 * Call flushCache on all regions of the specified table. 702 */ 703 public void flushcache(TableName tableName) throws IOException { 704 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 705 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 706 if (r.getTableDescriptor().getTableName().equals(tableName)) { 707 executeFlush(r); 708 } 709 } 710 } 711 } 712 713 /** 714 * Call flushCache on all regions on all participating regionservers. 715 */ 716 public void compact(boolean major) throws IOException { 717 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 718 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 719 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 720 r.compact(major); 721 } 722 } 723 } 724 } 725 726 /** 727 * Call flushCache on all regions of the specified table. 728 */ 729 public void compact(TableName tableName, boolean major) throws IOException { 730 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 731 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 732 if (r.getTableDescriptor().getTableName().equals(tableName)) { 733 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 734 r.compact(major); 735 } 736 } 737 } 738 } 739 } 740 741 /** Returns Number of live region servers in the cluster currently. */ 742 public int getNumLiveRegionServers() { 743 return this.hbaseCluster.getLiveRegionServers().size(); 744 } 745 746 /** 747 * @return List of region server threads. Does not return the master even though it is also a 748 * region server. 749 */ 750 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { 751 return this.hbaseCluster.getRegionServers(); 752 } 753 754 /** Returns List of live region server threads (skips the aborted and the killed) */ 755 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { 756 return this.hbaseCluster.getLiveRegionServers(); 757 } 758 759 /** 760 * Grab a numbered region server of your choice. 761 * @return region server 762 */ 763 public HRegionServer getRegionServer(int serverNumber) { 764 return hbaseCluster.getRegionServer(serverNumber); 765 } 766 767 public HRegionServer getRegionServer(ServerName serverName) { 768 return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer()) 769 .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null); 770 } 771 772 public List<HRegion> getRegions(byte[] tableName) { 773 return getRegions(TableName.valueOf(tableName)); 774 } 775 776 public List<HRegion> getRegions(TableName tableName) { 777 List<HRegion> ret = new ArrayList<>(); 778 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 779 HRegionServer hrs = rst.getRegionServer(); 780 for (Region region : hrs.getOnlineRegionsLocalContext()) { 781 if (region.getTableDescriptor().getTableName().equals(tableName)) { 782 ret.add((HRegion) region); 783 } 784 } 785 } 786 return ret; 787 } 788 789 /** 790 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying 791 * regionName. Returns -1 if none found. 792 */ 793 public int getServerWithMeta() { 794 return getServerWith(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()); 795 } 796 797 /** 798 * Get the location of the specified region 799 * @param regionName Name of the region in bytes 800 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying 801 * hbase:meta. Returns -1 if none found. 802 */ 803 public int getServerWith(byte[] regionName) { 804 int index = 0; 805 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 806 HRegionServer hrs = rst.getRegionServer(); 807 if (!hrs.isStopped()) { 808 Region region = hrs.getOnlineRegion(regionName); 809 if (region != null) { 810 return index; 811 } 812 } 813 index++; 814 } 815 return -1; 816 } 817 818 @Override 819 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) 820 throws IOException { 821 int index = getServerWith(regionName); 822 if (index < 0) { 823 return null; 824 } 825 return getRegionServer(index).getServerName(); 826 } 827 828 /** 829 * Counts the total numbers of regions being served by the currently online region servers by 830 * asking each how many regions they have. Does not look at hbase:meta at all. Count includes 831 * catalog tables. 832 * @return number of regions being served by all region servers 833 */ 834 public long countServedRegions() { 835 long count = 0; 836 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) { 837 count += rst.getRegionServer().getNumberOfOnlineRegions(); 838 } 839 return count; 840 } 841 842 /** 843 * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the 844 * mini-cluster back for clean shutdown. 845 */ 846 public void killAll() { 847 // Do backups first. 848 MasterThread activeMaster = null; 849 for (MasterThread masterThread : getMasterThreads()) { 850 if (!masterThread.getMaster().isActiveMaster()) { 851 masterThread.getMaster().abort("killAll"); 852 } else { 853 activeMaster = masterThread; 854 } 855 } 856 // Do active after. 857 if (activeMaster != null) { 858 activeMaster.getMaster().abort("killAll"); 859 } 860 for (RegionServerThread rst : getRegionServerThreads()) { 861 rst.getRegionServer().abort("killAll"); 862 } 863 } 864 865 @Override 866 public void waitUntilShutDown() { 867 this.hbaseCluster.join(); 868 } 869 870 public List<HRegion> findRegionsForTable(TableName tableName) { 871 ArrayList<HRegion> ret = new ArrayList<>(); 872 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 873 HRegionServer hrs = rst.getRegionServer(); 874 for (Region region : hrs.getRegions(tableName)) { 875 if (region.getTableDescriptor().getTableName().equals(tableName)) { 876 ret.add((HRegion) region); 877 } 878 } 879 } 880 return ret; 881 } 882 883 protected int getRegionServerIndex(ServerName serverName) { 884 // we have a small number of region servers, this should be fine for now. 885 List<RegionServerThread> servers = getRegionServerThreads(); 886 for (int i = 0; i < servers.size(); i++) { 887 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 888 return i; 889 } 890 } 891 return -1; 892 } 893 894 protected int getMasterIndex(ServerName serverName) { 895 List<MasterThread> masters = getMasterThreads(); 896 for (int i = 0; i < masters.size(); i++) { 897 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 898 return i; 899 } 900 } 901 return -1; 902 } 903}