001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedAction; 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.hbase.client.RegionInfoBuilder; 029import org.apache.hadoop.hbase.client.RegionReplicaUtil; 030import org.apache.hadoop.hbase.master.HMaster; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.Region; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.test.MetricsAssertHelper; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.JVMClusterUtil; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.apache.hadoop.hbase.util.Threads; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 047 048/** 049 * This class creates a single process HBase cluster. each server. The master uses the 'default' 050 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem 051 * instance each and will close down their instance on the way out. 052 * @deprecated since 3.0.0, will be removed in 4.0.0. Use 053 * {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead. 054 */ 055@InterfaceAudience.Public 056@Deprecated 057public class MiniHBaseCluster extends HBaseCluster { 058 private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName()); 059 public LocalHBaseCluster hbaseCluster; 060 private static int index; 061 062 /** 063 * Start a MiniHBaseCluster. 064 * @param conf Configuration to be used for cluster 065 * @param numRegionServers initial number of region servers to start. n 066 */ 067 public MiniHBaseCluster(Configuration conf, int numRegionServers) 068 throws IOException, InterruptedException { 069 this(conf, 1, numRegionServers); 070 } 071 072 /** 073 * Start a MiniHBaseCluster. 074 * @param conf Configuration to be used for cluster 075 * @param numMasters initial number of masters to start. 076 * @param numRegionServers initial number of region servers to start. n 077 */ 078 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) 079 throws IOException, InterruptedException { 080 this(conf, numMasters, numRegionServers, null, null); 081 } 082 083 /** 084 * Start a MiniHBaseCluster. 085 * @param conf Configuration to be used for cluster 086 * @param numMasters initial number of masters to start. 087 * @param numRegionServers initial number of region servers to start. 088 */ 089 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 090 Class<? extends HMaster> masterClass, 091 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 092 throws IOException, InterruptedException { 093 this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass); 094 } 095 096 /** 097 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster 098 * restart where for sure the regionservers come up on same address+port (but just 099 * with different startcode); by default mini hbase clusters choose new arbitrary 100 * ports on each cluster start. nn 101 */ 102 public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters, 103 int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 104 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 105 throws IOException, InterruptedException { 106 super(conf); 107 108 // Hadoop 2 109 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); 110 111 init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass, 112 regionserverClass); 113 this.initialClusterStatus = getClusterMetrics(); 114 } 115 116 public Configuration getConfiguration() { 117 return this.conf; 118 } 119 120 /** 121 * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance 122 * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem 123 * only, not All filesystems as the FileSystem system exit hook does. 124 */ 125 public static class MiniHBaseClusterRegionServer extends HRegionServer { 126 private Thread shutdownThread = null; 127 private User user = null; 128 /** 129 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any 130 * restarted instances of the same server will have different ServerName and will not coincide 131 * with past dead ones. So there's no need to cleanup this list. 132 */ 133 static Set<ServerName> killedServers = new HashSet<>(); 134 135 public MiniHBaseClusterRegionServer(Configuration conf) 136 throws IOException, InterruptedException { 137 super(conf); 138 this.user = User.getCurrent(); 139 } 140 141 /* 142 * n * @param currentfs We return this if we did not make a new one. 143 * @param uniqueName Same name used to help identify the created fs. 144 * @return A new fs instance if we are up on DistributeFileSystem. n 145 */ 146 147 @Override 148 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 149 throws IOException { 150 super.handleReportForDutyResponse(c); 151 // Run this thread to shutdown our filesystem on way out. 152 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem()); 153 } 154 155 @Override 156 public void run() { 157 try { 158 this.user.runAs(new PrivilegedAction<Object>() { 159 @Override 160 public Object run() { 161 runRegionServer(); 162 return null; 163 } 164 }); 165 } catch (Throwable t) { 166 LOG.error("Exception in run", t); 167 } finally { 168 // Run this on the way out. 169 if (this.shutdownThread != null) { 170 this.shutdownThread.start(); 171 Threads.shutdown(this.shutdownThread, 30000); 172 } 173 } 174 } 175 176 private void runRegionServer() { 177 super.run(); 178 } 179 180 @Override 181 protected void kill() { 182 killedServers.add(getServerName()); 183 super.kill(); 184 } 185 186 @Override 187 public void abort(final String reason, final Throwable cause) { 188 this.user.runAs(new PrivilegedAction<Object>() { 189 @Override 190 public Object run() { 191 abortRegionServer(reason, cause); 192 return null; 193 } 194 }); 195 } 196 197 private void abortRegionServer(String reason, Throwable cause) { 198 super.abort(reason, cause); 199 } 200 } 201 202 /** 203 * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook 204 * does. 205 */ 206 static class SingleFileSystemShutdownThread extends Thread { 207 private final FileSystem fs; 208 209 SingleFileSystemShutdownThread(final FileSystem fs) { 210 super("Shutdown of " + fs); 211 this.fs = fs; 212 } 213 214 @Override 215 public void run() { 216 try { 217 LOG.info("Hook closing fs=" + this.fs); 218 this.fs.close(); 219 } catch (NullPointerException npe) { 220 LOG.debug("Need to fix these: " + npe.toString()); 221 } catch (IOException e) { 222 LOG.warn("Running hook", e); 223 } 224 } 225 } 226 227 private void init(final int nMasterNodes, final int numAlwaysStandByMasters, 228 final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 229 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 230 throws IOException, InterruptedException { 231 try { 232 if (masterClass == null) { 233 masterClass = HMaster.class; 234 } 235 if (regionserverClass == null) { 236 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class; 237 } 238 239 // start up a LocalHBaseCluster 240 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0, 241 masterClass, regionserverClass); 242 243 // manually add the regionservers as other users 244 for (int i = 0; i < nRegionNodes; i++) { 245 Configuration rsConf = HBaseConfiguration.create(conf); 246 if (rsPorts != null) { 247 rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); 248 } 249 User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++); 250 hbaseCluster.addRegionServer(rsConf, i, user); 251 } 252 253 hbaseCluster.startup(); 254 } catch (IOException e) { 255 shutdown(); 256 throw e; 257 } catch (Throwable t) { 258 LOG.error("Error starting cluster", t); 259 shutdown(); 260 throw new IOException("Shutting down", t); 261 } 262 } 263 264 @Override 265 public void startRegionServer(String hostname, int port) throws IOException { 266 final Configuration newConf = HBaseConfiguration.create(conf); 267 newConf.setInt(HConstants.REGIONSERVER_PORT, port); 268 startRegionServer(newConf); 269 } 270 271 @Override 272 public void killRegionServer(ServerName serverName) throws IOException { 273 HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); 274 if (server instanceof MiniHBaseClusterRegionServer) { 275 LOG.info("Killing " + server.toString()); 276 ((MiniHBaseClusterRegionServer) server).kill(); 277 } else { 278 abortRegionServer(getRegionServerIndex(serverName)); 279 } 280 } 281 282 @Override 283 public boolean isKilledRS(ServerName serverName) { 284 return MiniHBaseClusterRegionServer.killedServers.contains(serverName); 285 } 286 287 @Override 288 public void stopRegionServer(ServerName serverName) throws IOException { 289 stopRegionServer(getRegionServerIndex(serverName)); 290 } 291 292 @Override 293 public void suspendRegionServer(ServerName serverName) throws IOException { 294 suspendRegionServer(getRegionServerIndex(serverName)); 295 } 296 297 @Override 298 public void resumeRegionServer(ServerName serverName) throws IOException { 299 resumeRegionServer(getRegionServerIndex(serverName)); 300 } 301 302 @Override 303 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 304 // ignore timeout for now 305 waitOnRegionServer(getRegionServerIndex(serverName)); 306 } 307 308 @Override 309 public void startZkNode(String hostname, int port) throws IOException { 310 LOG.warn("Starting zookeeper nodes on mini cluster is not supported"); 311 } 312 313 @Override 314 public void killZkNode(ServerName serverName) throws IOException { 315 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported"); 316 } 317 318 @Override 319 public void stopZkNode(ServerName serverName) throws IOException { 320 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported"); 321 } 322 323 @Override 324 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 325 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported"); 326 } 327 328 @Override 329 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 330 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported"); 331 } 332 333 @Override 334 public void startDataNode(ServerName serverName) throws IOException { 335 LOG.warn("Starting datanodes on mini cluster is not supported"); 336 } 337 338 @Override 339 public void killDataNode(ServerName serverName) throws IOException { 340 LOG.warn("Aborting datanodes on mini cluster is not supported"); 341 } 342 343 @Override 344 public void stopDataNode(ServerName serverName) throws IOException { 345 LOG.warn("Stopping datanodes on mini cluster is not supported"); 346 } 347 348 @Override 349 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 350 LOG.warn("Waiting for datanodes to start on mini cluster is not supported"); 351 } 352 353 @Override 354 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 355 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported"); 356 } 357 358 @Override 359 public void startNameNode(ServerName serverName) throws IOException { 360 LOG.warn("Starting namenodes on mini cluster is not supported"); 361 } 362 363 @Override 364 public void killNameNode(ServerName serverName) throws IOException { 365 LOG.warn("Aborting namenodes on mini cluster is not supported"); 366 } 367 368 @Override 369 public void stopNameNode(ServerName serverName) throws IOException { 370 LOG.warn("Stopping namenodes on mini cluster is not supported"); 371 } 372 373 @Override 374 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 375 LOG.warn("Waiting for namenodes to start on mini cluster is not supported"); 376 } 377 378 @Override 379 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 380 LOG.warn("Waiting for namenodes to stop on mini cluster is not supported"); 381 } 382 383 @Override 384 public void startMaster(String hostname, int port) throws IOException { 385 this.startMaster(); 386 } 387 388 @Override 389 public void killMaster(ServerName serverName) throws IOException { 390 abortMaster(getMasterIndex(serverName)); 391 } 392 393 @Override 394 public void stopMaster(ServerName serverName) throws IOException { 395 stopMaster(getMasterIndex(serverName)); 396 } 397 398 @Override 399 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 400 // ignore timeout for now 401 waitOnMaster(getMasterIndex(serverName)); 402 } 403 404 /** 405 * Starts a region server thread running n * @return New RegionServerThread 406 */ 407 public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException { 408 final Configuration newConf = HBaseConfiguration.create(conf); 409 return startRegionServer(newConf); 410 } 411 412 private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration) 413 throws IOException { 414 User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++); 415 JVMClusterUtil.RegionServerThread t = null; 416 try { 417 t = 418 hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser); 419 t.start(); 420 t.waitForServerOnline(); 421 } catch (InterruptedException ie) { 422 throw new IOException("Interrupted adding regionserver to cluster", ie); 423 } 424 return t; 425 } 426 427 /** 428 * Starts a region server thread and waits until its processed by master. Throws an exception when 429 * it can't start a region server or when the region server is not processed by master within the 430 * timeout. 431 * @return New RegionServerThread 432 */ 433 public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) 434 throws IOException { 435 436 JVMClusterUtil.RegionServerThread t = startRegionServer(); 437 ServerName rsServerName = t.getRegionServer().getServerName(); 438 439 long start = EnvironmentEdgeManager.currentTime(); 440 ClusterMetrics clusterStatus = getClusterMetrics(); 441 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 442 if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) { 443 return t; 444 } 445 Threads.sleep(100); 446 } 447 if (t.getRegionServer().isOnline()) { 448 throw new IOException("RS: " + rsServerName + " online, but not processed by master"); 449 } else { 450 throw new IOException("RS: " + rsServerName + " is offline"); 451 } 452 } 453 454 /** 455 * Cause a region server to exit doing basic clean up only on its way out. 456 * @param serverNumber Used as index into a list. 457 */ 458 public String abortRegionServer(int serverNumber) { 459 HRegionServer server = getRegionServer(serverNumber); 460 LOG.info("Aborting " + server.toString()); 461 server.abort("Aborting for tests", new Exception("Trace info")); 462 return server.toString(); 463 } 464 465 /** 466 * Shut down the specified region server cleanly 467 * @param serverNumber Used as index into a list. 468 * @return the region server that was stopped 469 */ 470 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { 471 return stopRegionServer(serverNumber, true); 472 } 473 474 /** 475 * Shut down the specified region server cleanly 476 * @param serverNumber Used as index into a list. 477 * @param shutdownFS True is we are to shutdown the filesystem as part of this regionserver's 478 * shutdown. Usually we do but you do not want to do this if you are running 479 * multiple regionservers in a test and you shut down one before end of the 480 * test. 481 * @return the region server that was stopped 482 */ 483 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, 484 final boolean shutdownFS) { 485 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 486 LOG.info("Stopping " + server.toString()); 487 server.getRegionServer().stop("Stopping rs " + serverNumber); 488 return server; 489 } 490 491 /** 492 * Suspend the specified region server 493 * @param serverNumber Used as index into a list. 494 */ 495 public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) { 496 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 497 LOG.info("Suspending {}", server.toString()); 498 server.suspend(); 499 return server; 500 } 501 502 /** 503 * Resume the specified region server 504 * @param serverNumber Used as index into a list. 505 */ 506 public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) { 507 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 508 LOG.info("Resuming {}", server.toString()); 509 server.resume(); 510 return server; 511 } 512 513 /** 514 * Wait for the specified region server to stop. Removes this thread from list of running threads. 515 * n * @return Name of region server that just went down. 516 */ 517 public String waitOnRegionServer(final int serverNumber) { 518 return this.hbaseCluster.waitOnRegionServer(serverNumber); 519 } 520 521 /** 522 * Starts a master thread running 523 * @return New RegionServerThread 524 */ 525 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 526 value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", 527 justification = "Testing only, not a big deal") 528 public JVMClusterUtil.MasterThread startMaster() throws IOException { 529 Configuration c = HBaseConfiguration.create(conf); 530 User user = HBaseTestingUtility.getDifferentUser(c, ".hfs." + index++); 531 532 JVMClusterUtil.MasterThread t = null; 533 try { 534 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user); 535 t.start(); 536 } catch (InterruptedException ie) { 537 throw new IOException("Interrupted adding master to cluster", ie); 538 } 539 conf.set(HConstants.MASTER_ADDRS_KEY, 540 hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY)); 541 return t; 542 } 543 544 /** 545 * Returns the current active master, if available. 546 * @return the active HMaster, null if none is active. 547 */ 548 public HMaster getMaster() { 549 return this.hbaseCluster.getActiveMaster(); 550 } 551 552 /** 553 * Returns the current active master thread, if available. 554 * @return the active MasterThread, null if none is active. 555 */ 556 public MasterThread getMasterThread() { 557 for (MasterThread mt : hbaseCluster.getLiveMasters()) { 558 if (mt.getMaster().isActiveMaster()) { 559 return mt; 560 } 561 } 562 return null; 563 } 564 565 /** 566 * Returns the master at the specified index, if available. 567 * @return the active HMaster, null if none is active. 568 */ 569 public HMaster getMaster(final int serverNumber) { 570 return this.hbaseCluster.getMaster(serverNumber); 571 } 572 573 /** 574 * Cause a master to exit without shutting down entire cluster. 575 * @param serverNumber Used as index into a list. 576 */ 577 public String abortMaster(int serverNumber) { 578 HMaster server = getMaster(serverNumber); 579 LOG.info("Aborting " + server.toString()); 580 server.abort("Aborting for tests", new Exception("Trace info")); 581 return server.toString(); 582 } 583 584 /** 585 * Shut down the specified master cleanly 586 * @param serverNumber Used as index into a list. 587 * @return the region server that was stopped 588 */ 589 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { 590 return stopMaster(serverNumber, true); 591 } 592 593 /** 594 * Shut down the specified master cleanly 595 * @param serverNumber Used as index into a list. 596 * @param shutdownFS True is we are to shutdown the filesystem as part of this master's 597 * shutdown. Usually we do but you do not want to do this if you are running 598 * multiple master in a test and you shut down one before end of the test. 599 * @return the master that was stopped 600 */ 601 public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) { 602 JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber); 603 LOG.info("Stopping " + server.toString()); 604 server.getMaster().stop("Stopping master " + serverNumber); 605 return server; 606 } 607 608 /** 609 * Wait for the specified master to stop. Removes this thread from list of running threads. n 610 * * @return Name of master that just went down. 611 */ 612 public String waitOnMaster(final int serverNumber) { 613 return this.hbaseCluster.waitOnMaster(serverNumber); 614 } 615 616 /** 617 * Blocks until there is an active master and that master has completed initialization. 618 * @return true if an active master becomes available. false if there are no masters left. n 619 */ 620 @Override 621 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 622 List<JVMClusterUtil.MasterThread> mts; 623 long start = EnvironmentEdgeManager.currentTime(); 624 while ( 625 !(mts = getMasterThreads()).isEmpty() 626 && (EnvironmentEdgeManager.currentTime() - start) < timeout 627 ) { 628 for (JVMClusterUtil.MasterThread mt : mts) { 629 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { 630 return true; 631 } 632 } 633 634 Threads.sleep(100); 635 } 636 return false; 637 } 638 639 /** 640 * @return List of master threads. 641 */ 642 public List<JVMClusterUtil.MasterThread> getMasterThreads() { 643 return this.hbaseCluster.getMasters(); 644 } 645 646 /** 647 * @return List of live master threads (skips the aborted and the killed) 648 */ 649 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() { 650 return this.hbaseCluster.getLiveMasters(); 651 } 652 653 /** 654 * Wait for Mini HBase Cluster to shut down. 655 */ 656 public void join() { 657 this.hbaseCluster.join(); 658 } 659 660 /** 661 * Shut down the mini HBase cluster 662 */ 663 @Override 664 public void shutdown() throws IOException { 665 if (this.hbaseCluster != null) { 666 this.hbaseCluster.shutdown(); 667 } 668 } 669 670 @Override 671 public void close() throws IOException { 672 } 673 674 @Override 675 public ClusterMetrics getClusterMetrics() throws IOException { 676 HMaster master = getMaster(); 677 return master == null ? null : master.getClusterMetrics(); 678 } 679 680 private void executeFlush(HRegion region) throws IOException { 681 if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 682 return; 683 } 684 // retry 5 times if we can not flush 685 for (int i = 0; i < 5; i++) { 686 FlushResult result = region.flush(true); 687 if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) { 688 return; 689 } 690 Threads.sleep(1000); 691 } 692 } 693 694 /** 695 * Call flushCache on all regions on all participating regionservers. 696 */ 697 public void flushcache() throws IOException { 698 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 699 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 700 executeFlush(r); 701 } 702 } 703 } 704 705 /** 706 * Call flushCache on all regions of the specified table. 707 */ 708 public void flushcache(TableName tableName) throws IOException { 709 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 710 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 711 if (r.getTableDescriptor().getTableName().equals(tableName)) { 712 executeFlush(r); 713 } 714 } 715 } 716 } 717 718 /** 719 * Call flushCache on all regions on all participating regionservers. n 720 */ 721 public void compact(boolean major) throws IOException { 722 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 723 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 724 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 725 r.compact(major); 726 } 727 } 728 } 729 } 730 731 /** 732 * Call flushCache on all regions of the specified table. n 733 */ 734 public void compact(TableName tableName, boolean major) throws IOException { 735 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 736 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 737 if (r.getTableDescriptor().getTableName().equals(tableName)) { 738 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 739 r.compact(major); 740 } 741 } 742 } 743 } 744 } 745 746 /** 747 * @return Number of live region servers in the cluster currently. 748 */ 749 public int getNumLiveRegionServers() { 750 return this.hbaseCluster.getLiveRegionServers().size(); 751 } 752 753 /** 754 * @return List of region server threads. Does not return the master even though it is also a 755 * region server. 756 */ 757 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { 758 return this.hbaseCluster.getRegionServers(); 759 } 760 761 /** 762 * @return List of live region server threads (skips the aborted and the killed) 763 */ 764 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { 765 return this.hbaseCluster.getLiveRegionServers(); 766 } 767 768 /** 769 * Grab a numbered region server of your choice. n * @return region server 770 */ 771 public HRegionServer getRegionServer(int serverNumber) { 772 return hbaseCluster.getRegionServer(serverNumber); 773 } 774 775 public HRegionServer getRegionServer(ServerName serverName) { 776 return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer()) 777 .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null); 778 } 779 780 public List<HRegion> getRegions(byte[] tableName) { 781 return getRegions(TableName.valueOf(tableName)); 782 } 783 784 public List<HRegion> getRegions(TableName tableName) { 785 List<HRegion> ret = new ArrayList<>(); 786 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 787 HRegionServer hrs = rst.getRegionServer(); 788 for (Region region : hrs.getOnlineRegionsLocalContext()) { 789 if (region.getTableDescriptor().getTableName().equals(tableName)) { 790 ret.add((HRegion) region); 791 } 792 } 793 } 794 return ret; 795 } 796 797 /** 798 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying 799 * regionName. Returns -1 if none found. 800 */ 801 public int getServerWithMeta() { 802 return getServerWith(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()); 803 } 804 805 /** 806 * Get the location of the specified region 807 * @param regionName Name of the region in bytes 808 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying 809 * hbase:meta. Returns -1 if none found. 810 */ 811 public int getServerWith(byte[] regionName) { 812 int index = 0; 813 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 814 HRegionServer hrs = rst.getRegionServer(); 815 if (!hrs.isStopped()) { 816 Region region = hrs.getOnlineRegion(regionName); 817 if (region != null) { 818 return index; 819 } 820 } 821 index++; 822 } 823 return -1; 824 } 825 826 @Override 827 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) 828 throws IOException { 829 int index = getServerWith(regionName); 830 if (index < 0) { 831 return null; 832 } 833 return getRegionServer(index).getServerName(); 834 } 835 836 /** 837 * Counts the total numbers of regions being served by the currently online region servers by 838 * asking each how many regions they have. Does not look at hbase:meta at all. Count includes 839 * catalog tables. 840 * @return number of regions being served by all region servers 841 */ 842 public long countServedRegions() { 843 long count = 0; 844 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) { 845 count += rst.getRegionServer().getNumberOfOnlineRegions(); 846 } 847 return count; 848 } 849 850 /** 851 * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the 852 * mini-cluster back for clean shutdown. 853 */ 854 public void killAll() { 855 // Do backups first. 856 MasterThread activeMaster = null; 857 for (MasterThread masterThread : getMasterThreads()) { 858 if (!masterThread.getMaster().isActiveMaster()) { 859 masterThread.getMaster().abort("killAll"); 860 } else { 861 activeMaster = masterThread; 862 } 863 } 864 // Do active after. 865 if (activeMaster != null) { 866 activeMaster.getMaster().abort("killAll"); 867 } 868 for (RegionServerThread rst : getRegionServerThreads()) { 869 rst.getRegionServer().abort("killAll"); 870 } 871 } 872 873 @Override 874 public void waitUntilShutDown() { 875 this.hbaseCluster.join(); 876 } 877 878 public List<HRegion> findRegionsForTable(TableName tableName) { 879 ArrayList<HRegion> ret = new ArrayList<>(); 880 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 881 HRegionServer hrs = rst.getRegionServer(); 882 for (Region region : hrs.getRegions(tableName)) { 883 if (region.getTableDescriptor().getTableName().equals(tableName)) { 884 ret.add((HRegion) region); 885 } 886 } 887 } 888 return ret; 889 } 890 891 protected int getRegionServerIndex(ServerName serverName) { 892 // we have a small number of region servers, this should be fine for now. 893 List<RegionServerThread> servers = getRegionServerThreads(); 894 for (int i = 0; i < servers.size(); i++) { 895 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 896 return i; 897 } 898 } 899 return -1; 900 } 901 902 protected int getMasterIndex(ServerName serverName) { 903 List<MasterThread> masters = getMasterThreads(); 904 for (int i = 0; i < masters.size(); i++) { 905 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 906 return i; 907 } 908 } 909 return -1; 910 } 911}