001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.security.PrivilegedAction; 023import java.util.ArrayList; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.hbase.master.HMaster; 030import org.apache.hadoop.hbase.regionserver.HRegion; 031import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 032import org.apache.hadoop.hbase.regionserver.HRegionServer; 033import org.apache.hadoop.hbase.regionserver.Region; 034import org.apache.hadoop.hbase.security.User; 035import org.apache.hadoop.hbase.test.MetricsAssertHelper; 036import org.apache.hadoop.hbase.util.JVMClusterUtil; 037import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 048 049/** 050 * This class creates a single process HBase cluster. 051 * each server. The master uses the 'default' FileSystem. The RegionServers, 052 * if we are running on DistributedFilesystem, create a FileSystem instance 053 * each and will close down their instance on the way out. 054 */ 055@InterfaceAudience.Public 056public class MiniHBaseCluster extends HBaseCluster { 057 private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName()); 058 public LocalHBaseCluster hbaseCluster; 059 private static int index; 060 061 /** 062 * Start a MiniHBaseCluster. 063 * @param conf Configuration to be used for cluster 064 * @param numRegionServers initial number of region servers to start. 065 * @throws IOException 066 */ 067 public MiniHBaseCluster(Configuration conf, int numRegionServers) 068 throws IOException, InterruptedException { 069 this(conf, 1, numRegionServers); 070 } 071 072 /** 073 * Start a MiniHBaseCluster. 074 * @param conf Configuration to be used for cluster 075 * @param numMasters initial number of masters to start. 076 * @param numRegionServers initial number of region servers to start. 077 * @throws IOException 078 */ 079 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) 080 throws IOException, InterruptedException { 081 this(conf, numMasters, numRegionServers, null, null); 082 } 083 084 /** 085 * Start a MiniHBaseCluster. 086 * @param conf Configuration to be used for cluster 087 * @param numMasters initial number of masters to start. 088 * @param numRegionServers initial number of region servers to start. 089 */ 090 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 091 Class<? extends HMaster> masterClass, 092 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 093 throws IOException, InterruptedException { 094 this(conf, numMasters, numRegionServers, null, masterClass, regionserverClass); 095 } 096 097 /** 098 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster 099 * restart where for sure the regionservers come up on same address+port (but 100 * just with different startcode); by default mini hbase clusters choose new 101 * arbitrary ports on each cluster start. 102 * @throws IOException 103 * @throws InterruptedException 104 */ 105 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 106 List<Integer> rsPorts, 107 Class<? extends HMaster> masterClass, 108 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 109 throws IOException, InterruptedException { 110 super(conf); 111 112 // Hadoop 2 113 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); 114 115 init(numMasters, numRegionServers, rsPorts, masterClass, regionserverClass); 116 this.initialClusterStatus = getClusterStatus(); 117 } 118 119 public Configuration getConfiguration() { 120 return this.conf; 121 } 122 123 /** 124 * Subclass so can get at protected methods (none at moment). Also, creates 125 * a FileSystem instance per instantiation. Adds a shutdown own FileSystem 126 * on the way out. Shuts down own Filesystem only, not All filesystems as 127 * the FileSystem system exit hook does. 128 */ 129 public static class MiniHBaseClusterRegionServer extends HRegionServer { 130 private Thread shutdownThread = null; 131 private User user = null; 132 /** 133 * List of RegionServers killed so far. ServerName also comprises startCode of a server, 134 * so any restarted instances of the same server will have different ServerName and will not 135 * coincide with past dead ones. So there's no need to cleanup this list. 136 */ 137 static Set<ServerName> killedServers = new HashSet<>(); 138 139 public MiniHBaseClusterRegionServer(Configuration conf) 140 throws IOException, InterruptedException { 141 super(conf); 142 this.user = User.getCurrent(); 143 } 144 145 /* 146 * @param c 147 * @param currentfs We return this if we did not make a new one. 148 * @param uniqueName Same name used to help identify the created fs. 149 * @return A new fs instance if we are up on DistributeFileSystem. 150 * @throws IOException 151 */ 152 153 @Override 154 protected void handleReportForDutyResponse( 155 final RegionServerStartupResponse c) throws IOException { 156 super.handleReportForDutyResponse(c); 157 // Run this thread to shutdown our filesystem on way out. 158 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem()); 159 } 160 161 @Override 162 public void run() { 163 try { 164 this.user.runAs(new PrivilegedAction<Object>() { 165 @Override 166 public Object run() { 167 runRegionServer(); 168 return null; 169 } 170 }); 171 } catch (Throwable t) { 172 LOG.error("Exception in run", t); 173 } finally { 174 // Run this on the way out. 175 if (this.shutdownThread != null) { 176 this.shutdownThread.start(); 177 Threads.shutdown(this.shutdownThread, 30000); 178 } 179 } 180 } 181 182 private void runRegionServer() { 183 super.run(); 184 } 185 186 @Override 187 protected void kill() { 188 killedServers.add(getServerName()); 189 super.kill(); 190 } 191 192 @Override 193 public void abort(final String reason, final Throwable cause) { 194 this.user.runAs(new PrivilegedAction<Object>() { 195 @Override 196 public Object run() { 197 abortRegionServer(reason, cause); 198 return null; 199 } 200 }); 201 } 202 203 private void abortRegionServer(String reason, Throwable cause) { 204 super.abort(reason, cause); 205 } 206 } 207 208 /** 209 * Alternate shutdown hook. 210 * Just shuts down the passed fs, not all as default filesystem hook does. 211 */ 212 static class SingleFileSystemShutdownThread extends Thread { 213 private final FileSystem fs; 214 SingleFileSystemShutdownThread(final FileSystem fs) { 215 super("Shutdown of " + fs); 216 this.fs = fs; 217 } 218 @Override 219 public void run() { 220 try { 221 LOG.info("Hook closing fs=" + this.fs); 222 this.fs.close(); 223 } catch (NullPointerException npe) { 224 LOG.debug("Need to fix these: " + npe.toString()); 225 } catch (IOException e) { 226 LOG.warn("Running hook", e); 227 } 228 } 229 } 230 231 private void init(final int nMasterNodes, final int nRegionNodes, List<Integer> rsPorts, 232 Class<? extends HMaster> masterClass, 233 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 234 throws IOException, InterruptedException { 235 try { 236 if (masterClass == null){ 237 masterClass = HMaster.class; 238 } 239 if (regionserverClass == null){ 240 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class; 241 } 242 243 // start up a LocalHBaseCluster 244 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, 245 masterClass, regionserverClass); 246 247 // manually add the regionservers as other users 248 for (int i = 0; i < nRegionNodes; i++) { 249 Configuration rsConf = HBaseConfiguration.create(conf); 250 if (rsPorts != null) { 251 rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); 252 } 253 User user = HBaseTestingUtility.getDifferentUser(rsConf, 254 ".hfs."+index++); 255 hbaseCluster.addRegionServer(rsConf, i, user); 256 } 257 258 hbaseCluster.startup(); 259 } catch (IOException e) { 260 shutdown(); 261 throw e; 262 } catch (Throwable t) { 263 LOG.error("Error starting cluster", t); 264 shutdown(); 265 throw new IOException("Shutting down", t); 266 } 267 } 268 269 @Override 270 public void startRegionServer(String hostname, int port) throws IOException { 271 this.startRegionServer(); 272 } 273 274 @Override 275 public void killRegionServer(ServerName serverName) throws IOException { 276 HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); 277 if (server instanceof MiniHBaseClusterRegionServer) { 278 LOG.info("Killing " + server.toString()); 279 ((MiniHBaseClusterRegionServer) server).kill(); 280 } else { 281 abortRegionServer(getRegionServerIndex(serverName)); 282 } 283 } 284 285 @Override 286 public boolean isKilledRS(ServerName serverName) { 287 return MiniHBaseClusterRegionServer.killedServers.contains(serverName); 288 } 289 290 @Override 291 public void stopRegionServer(ServerName serverName) throws IOException { 292 stopRegionServer(getRegionServerIndex(serverName)); 293 } 294 295 @Override 296 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 297 //ignore timeout for now 298 waitOnRegionServer(getRegionServerIndex(serverName)); 299 } 300 301 @Override 302 public void startZkNode(String hostname, int port) throws IOException { 303 LOG.warn("Starting zookeeper nodes on mini cluster is not supported"); 304 } 305 306 @Override 307 public void killZkNode(ServerName serverName) throws IOException { 308 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported"); 309 } 310 311 @Override 312 public void stopZkNode(ServerName serverName) throws IOException { 313 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported"); 314 } 315 316 @Override 317 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 318 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported"); 319 } 320 321 @Override 322 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 323 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported"); 324 } 325 326 @Override 327 public void startDataNode(ServerName serverName) throws IOException { 328 LOG.warn("Starting datanodes on mini cluster is not supported"); 329 } 330 331 @Override 332 public void killDataNode(ServerName serverName) throws IOException { 333 LOG.warn("Aborting datanodes on mini cluster is not supported"); 334 } 335 336 @Override 337 public void stopDataNode(ServerName serverName) throws IOException { 338 LOG.warn("Stopping datanodes on mini cluster is not supported"); 339 } 340 341 @Override 342 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 343 LOG.warn("Waiting for datanodes to start on mini cluster is not supported"); 344 } 345 346 @Override 347 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 348 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported"); 349 } 350 351 @Override 352 public void startMaster(String hostname, int port) throws IOException { 353 this.startMaster(); 354 } 355 356 @Override 357 public void killMaster(ServerName serverName) throws IOException { 358 abortMaster(getMasterIndex(serverName)); 359 } 360 361 @Override 362 public void stopMaster(ServerName serverName) throws IOException { 363 stopMaster(getMasterIndex(serverName)); 364 } 365 366 @Override 367 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 368 //ignore timeout for now 369 waitOnMaster(getMasterIndex(serverName)); 370 } 371 372 /** 373 * Starts a region server thread running 374 * 375 * @throws IOException 376 * @return New RegionServerThread 377 */ 378 public JVMClusterUtil.RegionServerThread startRegionServer() 379 throws IOException { 380 final Configuration newConf = HBaseConfiguration.create(conf); 381 User rsUser = 382 HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++); 383 JVMClusterUtil.RegionServerThread t = null; 384 try { 385 t = hbaseCluster.addRegionServer( 386 newConf, hbaseCluster.getRegionServers().size(), rsUser); 387 t.start(); 388 t.waitForServerOnline(); 389 } catch (InterruptedException ie) { 390 throw new IOException("Interrupted adding regionserver to cluster", ie); 391 } 392 return t; 393 } 394 395 /** 396 * Starts a region server thread and waits until its processed by master. Throws an exception 397 * when it can't start a region server or when the region server is not processed by master 398 * within the timeout. 399 * 400 * @return New RegionServerThread 401 */ 402 public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) 403 throws IOException { 404 405 JVMClusterUtil.RegionServerThread t = startRegionServer(); 406 ServerName rsServerName = t.getRegionServer().getServerName(); 407 408 long start = System.currentTimeMillis(); 409 ClusterStatus clusterStatus = getClusterStatus(); 410 while ((System.currentTimeMillis() - start) < timeout) { 411 if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) { 412 return t; 413 } 414 Threads.sleep(100); 415 } 416 if (t.getRegionServer().isOnline()) { 417 throw new IOException("RS: " + rsServerName + " online, but not processed by master"); 418 } else { 419 throw new IOException("RS: " + rsServerName + " is offline"); 420 } 421 } 422 423 /** 424 * Cause a region server to exit doing basic clean up only on its way out. 425 * @param serverNumber Used as index into a list. 426 */ 427 public String abortRegionServer(int serverNumber) { 428 HRegionServer server = getRegionServer(serverNumber); 429 LOG.info("Aborting " + server.toString()); 430 server.abort("Aborting for tests", new Exception("Trace info")); 431 return server.toString(); 432 } 433 434 /** 435 * Shut down the specified region server cleanly 436 * 437 * @param serverNumber Used as index into a list. 438 * @return the region server that was stopped 439 */ 440 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { 441 return stopRegionServer(serverNumber, true); 442 } 443 444 /** 445 * Shut down the specified region server cleanly 446 * 447 * @param serverNumber Used as index into a list. 448 * @param shutdownFS True is we are to shutdown the filesystem as part of this 449 * regionserver's shutdown. Usually we do but you do not want to do this if 450 * you are running multiple regionservers in a test and you shut down one 451 * before end of the test. 452 * @return the region server that was stopped 453 */ 454 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, 455 final boolean shutdownFS) { 456 JVMClusterUtil.RegionServerThread server = 457 hbaseCluster.getRegionServers().get(serverNumber); 458 LOG.info("Stopping " + server.toString()); 459 server.getRegionServer().stop("Stopping rs " + serverNumber); 460 return server; 461 } 462 463 /** 464 * Wait for the specified region server to stop. Removes this thread from list 465 * of running threads. 466 * @param serverNumber 467 * @return Name of region server that just went down. 468 */ 469 public String waitOnRegionServer(final int serverNumber) { 470 return this.hbaseCluster.waitOnRegionServer(serverNumber); 471 } 472 473 474 /** 475 * Starts a master thread running 476 * 477 * @return New RegionServerThread 478 */ 479 public JVMClusterUtil.MasterThread startMaster() throws IOException { 480 Configuration c = HBaseConfiguration.create(conf); 481 User user = 482 HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++); 483 484 JVMClusterUtil.MasterThread t = null; 485 try { 486 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user); 487 t.start(); 488 } catch (InterruptedException ie) { 489 throw new IOException("Interrupted adding master to cluster", ie); 490 } 491 return t; 492 } 493 494 /** 495 * Returns the current active master, if available. 496 * @return the active HMaster, null if none is active. 497 */ 498 @Override 499 public MasterService.BlockingInterface getMasterAdminService() { 500 return this.hbaseCluster.getActiveMaster().getMasterRpcServices(); 501 } 502 503 /** 504 * Returns the current active master, if available. 505 * @return the active HMaster, null if none is active. 506 */ 507 public HMaster getMaster() { 508 return this.hbaseCluster.getActiveMaster(); 509 } 510 511 /** 512 * Returns the current active master thread, if available. 513 * @return the active MasterThread, null if none is active. 514 */ 515 public MasterThread getMasterThread() { 516 for (MasterThread mt: hbaseCluster.getLiveMasters()) { 517 if (mt.getMaster().isActiveMaster()) { 518 return mt; 519 } 520 } 521 return null; 522 } 523 524 /** 525 * Returns the master at the specified index, if available. 526 * @return the active HMaster, null if none is active. 527 */ 528 public HMaster getMaster(final int serverNumber) { 529 return this.hbaseCluster.getMaster(serverNumber); 530 } 531 532 /** 533 * Cause a master to exit without shutting down entire cluster. 534 * @param serverNumber Used as index into a list. 535 */ 536 public String abortMaster(int serverNumber) { 537 HMaster server = getMaster(serverNumber); 538 LOG.info("Aborting " + server.toString()); 539 server.abort("Aborting for tests", new Exception("Trace info")); 540 return server.toString(); 541 } 542 543 /** 544 * Shut down the specified master cleanly 545 * 546 * @param serverNumber Used as index into a list. 547 * @return the region server that was stopped 548 */ 549 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { 550 return stopMaster(serverNumber, true); 551 } 552 553 /** 554 * Shut down the specified master cleanly 555 * 556 * @param serverNumber Used as index into a list. 557 * @param shutdownFS True is we are to shutdown the filesystem as part of this 558 * master's shutdown. Usually we do but you do not want to do this if 559 * you are running multiple master in a test and you shut down one 560 * before end of the test. 561 * @return the master that was stopped 562 */ 563 public JVMClusterUtil.MasterThread stopMaster(int serverNumber, 564 final boolean shutdownFS) { 565 JVMClusterUtil.MasterThread server = 566 hbaseCluster.getMasters().get(serverNumber); 567 LOG.info("Stopping " + server.toString()); 568 server.getMaster().stop("Stopping master " + serverNumber); 569 return server; 570 } 571 572 /** 573 * Wait for the specified master to stop. Removes this thread from list 574 * of running threads. 575 * @param serverNumber 576 * @return Name of master that just went down. 577 */ 578 public String waitOnMaster(final int serverNumber) { 579 return this.hbaseCluster.waitOnMaster(serverNumber); 580 } 581 582 /** 583 * Blocks until there is an active master and that master has completed 584 * initialization. 585 * 586 * @return true if an active master becomes available. false if there are no 587 * masters left. 588 * @throws InterruptedException 589 */ 590 @Override 591 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 592 List<JVMClusterUtil.MasterThread> mts; 593 long start = System.currentTimeMillis(); 594 while (!(mts = getMasterThreads()).isEmpty() 595 && (System.currentTimeMillis() - start) < timeout) { 596 for (JVMClusterUtil.MasterThread mt : mts) { 597 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { 598 return true; 599 } 600 } 601 602 Threads.sleep(100); 603 } 604 return false; 605 } 606 607 /** 608 * @return List of master threads. 609 */ 610 public List<JVMClusterUtil.MasterThread> getMasterThreads() { 611 return this.hbaseCluster.getMasters(); 612 } 613 614 /** 615 * @return List of live master threads (skips the aborted and the killed) 616 */ 617 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() { 618 return this.hbaseCluster.getLiveMasters(); 619 } 620 621 /** 622 * Wait for Mini HBase Cluster to shut down. 623 */ 624 public void join() { 625 this.hbaseCluster.join(); 626 } 627 628 /** 629 * Shut down the mini HBase cluster 630 */ 631 @Override 632 public void shutdown() throws IOException { 633 if (this.hbaseCluster != null) { 634 this.hbaseCluster.shutdown(); 635 } 636 } 637 638 @Override 639 public void close() throws IOException { 640 } 641 642 /** 643 * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 644 * Use {@link #getClusterMetrics()} instead. 645 */ 646 @Deprecated 647 public ClusterStatus getClusterStatus() throws IOException { 648 HMaster master = getMaster(); 649 return master == null ? null : new ClusterStatus(master.getClusterMetrics()); 650 } 651 652 @Override 653 public ClusterMetrics getClusterMetrics() throws IOException { 654 HMaster master = getMaster(); 655 return master == null ? null : master.getClusterMetrics(); 656 } 657 658 private void executeFlush(HRegion region) throws IOException { 659 // retry 5 times if we can not flush 660 for (int i = 0; i < 5; i++) { 661 FlushResult result = region.flush(true); 662 if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) { 663 return; 664 } 665 Threads.sleep(1000); 666 } 667 } 668 669 /** 670 * Call flushCache on all regions on all participating regionservers. 671 */ 672 public void flushcache() throws IOException { 673 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 674 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 675 executeFlush(r); 676 } 677 } 678 } 679 680 /** 681 * Call flushCache on all regions of the specified table. 682 */ 683 public void flushcache(TableName tableName) throws IOException { 684 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 685 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 686 if (r.getTableDescriptor().getTableName().equals(tableName)) { 687 executeFlush(r); 688 } 689 } 690 } 691 } 692 693 /** 694 * Call flushCache on all regions on all participating regionservers. 695 * @throws IOException 696 */ 697 public void compact(boolean major) throws IOException { 698 for (JVMClusterUtil.RegionServerThread t: 699 this.hbaseCluster.getRegionServers()) { 700 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) { 701 r.compact(major); 702 } 703 } 704 } 705 706 /** 707 * Call flushCache on all regions of the specified table. 708 * @throws IOException 709 */ 710 public void compact(TableName tableName, boolean major) throws IOException { 711 for (JVMClusterUtil.RegionServerThread t: 712 this.hbaseCluster.getRegionServers()) { 713 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) { 714 if(r.getTableDescriptor().getTableName().equals(tableName)) { 715 r.compact(major); 716 } 717 } 718 } 719 } 720 721 /** 722 * @return List of region server threads. Does not return the master even though it is also 723 * a region server. 724 */ 725 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { 726 return this.hbaseCluster.getRegionServers(); 727 } 728 729 /** 730 * @return List of live region server threads (skips the aborted and the killed) 731 */ 732 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { 733 return this.hbaseCluster.getLiveRegionServers(); 734 } 735 736 /** 737 * Grab a numbered region server of your choice. 738 * @param serverNumber 739 * @return region server 740 */ 741 public HRegionServer getRegionServer(int serverNumber) { 742 return hbaseCluster.getRegionServer(serverNumber); 743 } 744 745 public HRegionServer getRegionServer(ServerName serverName) { 746 return hbaseCluster.getRegionServers().stream() 747 .map(t -> t.getRegionServer()) 748 .filter(r -> r.getServerName().equals(serverName)) 749 .findFirst().orElse(null); 750 } 751 752 public List<HRegion> getRegions(byte[] tableName) { 753 return getRegions(TableName.valueOf(tableName)); 754 } 755 756 public List<HRegion> getRegions(TableName tableName) { 757 List<HRegion> ret = new ArrayList<>(); 758 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 759 HRegionServer hrs = rst.getRegionServer(); 760 for (Region region : hrs.getOnlineRegionsLocalContext()) { 761 if (region.getTableDescriptor().getTableName().equals(tableName)) { 762 ret.add((HRegion)region); 763 } 764 } 765 } 766 return ret; 767 } 768 769 /** 770 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} 771 * of HRS carrying regionName. Returns -1 if none found. 772 */ 773 public int getServerWithMeta() { 774 return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); 775 } 776 777 /** 778 * Get the location of the specified region 779 * @param regionName Name of the region in bytes 780 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} 781 * of HRS carrying hbase:meta. Returns -1 if none found. 782 */ 783 public int getServerWith(byte[] regionName) { 784 int index = -1; 785 int count = 0; 786 for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) { 787 HRegionServer hrs = rst.getRegionServer(); 788 if (!hrs.isStopped()) { 789 Region region = hrs.getOnlineRegion(regionName); 790 if (region != null) { 791 index = count; 792 break; 793 } 794 } 795 count++; 796 } 797 return index; 798 } 799 800 @Override 801 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) 802 throws IOException { 803 // Assume there is only one master thread which is the active master. 804 // If there are multiple master threads, the backup master threads 805 // should hold some regions. Please refer to #countServedRegions 806 // to see how we find out all regions. 807 HMaster master = getMaster(); 808 Region region = master.getOnlineRegion(regionName); 809 if (region != null) { 810 return master.getServerName(); 811 } 812 int index = getServerWith(regionName); 813 if (index < 0) { 814 return null; 815 } 816 return getRegionServer(index).getServerName(); 817 } 818 819 /** 820 * Counts the total numbers of regions being served by the currently online 821 * region servers by asking each how many regions they have. Does not look 822 * at hbase:meta at all. Count includes catalog tables. 823 * @return number of regions being served by all region servers 824 */ 825 public long countServedRegions() { 826 long count = 0; 827 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) { 828 count += rst.getRegionServer().getNumberOfOnlineRegions(); 829 } 830 for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) { 831 count += mt.getMaster().getNumberOfOnlineRegions(); 832 } 833 return count; 834 } 835 836 /** 837 * Do a simulated kill all masters and regionservers. Useful when it is 838 * impossible to bring the mini-cluster back for clean shutdown. 839 */ 840 public void killAll() { 841 // Do backups first. 842 MasterThread activeMaster = null; 843 for (MasterThread masterThread : getMasterThreads()) { 844 if (!masterThread.getMaster().isActiveMaster()) { 845 masterThread.getMaster().abort("killAll"); 846 } else { 847 activeMaster = masterThread; 848 } 849 } 850 // Do active after. 851 if (activeMaster != null) { 852 activeMaster.getMaster().abort("killAll"); 853 } 854 for (RegionServerThread rst : getRegionServerThreads()) { 855 rst.getRegionServer().abort("killAll"); 856 } 857 } 858 859 @Override 860 public void waitUntilShutDown() { 861 this.hbaseCluster.join(); 862 } 863 864 public List<HRegion> findRegionsForTable(TableName tableName) { 865 ArrayList<HRegion> ret = new ArrayList<>(); 866 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 867 HRegionServer hrs = rst.getRegionServer(); 868 for (Region region : hrs.getRegions(tableName)) { 869 if (region.getTableDescriptor().getTableName().equals(tableName)) { 870 ret.add((HRegion)region); 871 } 872 } 873 } 874 return ret; 875 } 876 877 878 protected int getRegionServerIndex(ServerName serverName) { 879 //we have a small number of region servers, this should be fine for now. 880 List<RegionServerThread> servers = getRegionServerThreads(); 881 for (int i=0; i < servers.size(); i++) { 882 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 883 return i; 884 } 885 } 886 return -1; 887 } 888 889 protected int getMasterIndex(ServerName serverName) { 890 List<MasterThread> masters = getMasterThreads(); 891 for (int i = 0; i < masters.size(); i++) { 892 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 893 return i; 894 } 895 } 896 return -1; 897 } 898 899 @Override 900 public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException { 901 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices(); 902 } 903 904 @Override 905 public ClientService.BlockingInterface getClientProtocol(ServerName serverName) 906 throws IOException { 907 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices(); 908 } 909}