001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.security.PrivilegedAction; 023import java.util.ArrayList; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Set; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.hbase.master.HMaster; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.Region; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.test.MetricsAssertHelper; 037import org.apache.hadoop.hbase.util.JVMClusterUtil; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 040import org.apache.hadoop.hbase.util.Threads; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 049 050/** 051 * This class creates a single process HBase cluster. 052 * each server. The master uses the 'default' FileSystem. The RegionServers, 053 * if we are running on DistributedFilesystem, create a FileSystem instance 054 * each and will close down their instance on the way out. 055 */ 056@InterfaceAudience.Public 057public class MiniHBaseCluster extends HBaseCluster { 058 private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName()); 059 public LocalHBaseCluster hbaseCluster; 060 private static int index; 061 062 /** 063 * Start a MiniHBaseCluster. 064 * @param conf Configuration to be used for cluster 065 * @param numRegionServers initial number of region servers to start. 066 * @throws IOException 067 */ 068 public MiniHBaseCluster(Configuration conf, int numRegionServers) 069 throws IOException, InterruptedException { 070 this(conf, 1, numRegionServers); 071 } 072 073 /** 074 * Start a MiniHBaseCluster. 075 * @param conf Configuration to be used for cluster 076 * @param numMasters initial number of masters to start. 077 * @param numRegionServers initial number of region servers to start. 078 * @throws IOException 079 */ 080 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) 081 throws IOException, InterruptedException { 082 this(conf, numMasters, numRegionServers, null, null); 083 } 084 085 /** 086 * Start a MiniHBaseCluster. 087 * @param conf Configuration to be used for cluster 088 * @param numMasters initial number of masters to start. 089 * @param numRegionServers initial number of region servers to start. 090 */ 091 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 092 Class<? extends HMaster> masterClass, 093 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 094 throws IOException, InterruptedException { 095 this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass); 096 } 097 098 /** 099 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster 100 * restart where for sure the regionservers come up on same address+port (but 101 * just with different startcode); by default mini hbase clusters choose new 102 * arbitrary ports on each cluster start. 103 * @throws IOException 104 * @throws InterruptedException 105 */ 106 public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters, 107 int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 108 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 109 throws IOException, InterruptedException { 110 super(conf); 111 112 // Hadoop 2 113 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); 114 115 init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass, 116 regionserverClass); 117 this.initialClusterStatus = getClusterMetrics(); 118 } 119 120 public Configuration getConfiguration() { 121 return this.conf; 122 } 123 124 /** 125 * Subclass so can get at protected methods (none at moment). Also, creates 126 * a FileSystem instance per instantiation. Adds a shutdown own FileSystem 127 * on the way out. Shuts down own Filesystem only, not All filesystems as 128 * the FileSystem system exit hook does. 129 */ 130 public static class MiniHBaseClusterRegionServer extends HRegionServer { 131 private Thread shutdownThread = null; 132 private User user = null; 133 /** 134 * List of RegionServers killed so far. ServerName also comprises startCode of a server, 135 * so any restarted instances of the same server will have different ServerName and will not 136 * coincide with past dead ones. So there's no need to cleanup this list. 137 */ 138 static Set<ServerName> killedServers = new HashSet<>(); 139 140 public MiniHBaseClusterRegionServer(Configuration conf) 141 throws IOException, InterruptedException { 142 super(conf); 143 this.user = User.getCurrent(); 144 } 145 146 /* 147 * @param c 148 * @param currentfs We return this if we did not make a new one. 149 * @param uniqueName Same name used to help identify the created fs. 150 * @return A new fs instance if we are up on DistributeFileSystem. 151 * @throws IOException 152 */ 153 154 @Override 155 protected void handleReportForDutyResponse( 156 final RegionServerStartupResponse c) throws IOException { 157 super.handleReportForDutyResponse(c); 158 // Run this thread to shutdown our filesystem on way out. 159 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem()); 160 } 161 162 @Override 163 public void run() { 164 try { 165 this.user.runAs(new PrivilegedAction<Object>() { 166 @Override 167 public Object run() { 168 runRegionServer(); 169 return null; 170 } 171 }); 172 } catch (Throwable t) { 173 LOG.error("Exception in run", t); 174 } finally { 175 // Run this on the way out. 176 if (this.shutdownThread != null) { 177 this.shutdownThread.start(); 178 Threads.shutdown(this.shutdownThread, 30000); 179 } 180 } 181 } 182 183 private void runRegionServer() { 184 super.run(); 185 } 186 187 @Override 188 protected void kill() { 189 killedServers.add(getServerName()); 190 super.kill(); 191 } 192 193 @Override 194 public void abort(final String reason, final Throwable cause) { 195 this.user.runAs(new PrivilegedAction<Object>() { 196 @Override 197 public Object run() { 198 abortRegionServer(reason, cause); 199 return null; 200 } 201 }); 202 } 203 204 private void abortRegionServer(String reason, Throwable cause) { 205 super.abort(reason, cause); 206 } 207 } 208 209 /** 210 * Alternate shutdown hook. 211 * Just shuts down the passed fs, not all as default filesystem hook does. 212 */ 213 static class SingleFileSystemShutdownThread extends Thread { 214 private final FileSystem fs; 215 SingleFileSystemShutdownThread(final FileSystem fs) { 216 super("Shutdown of " + fs); 217 this.fs = fs; 218 } 219 @Override 220 public void run() { 221 try { 222 LOG.info("Hook closing fs=" + this.fs); 223 this.fs.close(); 224 } catch (NullPointerException npe) { 225 LOG.debug("Need to fix these: " + npe.toString()); 226 } catch (IOException e) { 227 LOG.warn("Running hook", e); 228 } 229 } 230 } 231 232 private void init(final int nMasterNodes, final int numAlwaysStandByMasters, 233 final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 234 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 235 throws IOException, InterruptedException { 236 try { 237 if (masterClass == null){ 238 masterClass = HMaster.class; 239 } 240 if (regionserverClass == null){ 241 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class; 242 } 243 244 // start up a LocalHBaseCluster 245 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0, 246 masterClass, regionserverClass); 247 248 // manually add the regionservers as other users 249 for (int i = 0; i < nRegionNodes; i++) { 250 Configuration rsConf = HBaseConfiguration.create(conf); 251 if (rsPorts != null) { 252 rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); 253 } 254 User user = HBaseTestingUtility.getDifferentUser(rsConf, 255 ".hfs."+index++); 256 hbaseCluster.addRegionServer(rsConf, i, user); 257 } 258 259 hbaseCluster.startup(); 260 } catch (IOException e) { 261 shutdown(); 262 throw e; 263 } catch (Throwable t) { 264 LOG.error("Error starting cluster", t); 265 shutdown(); 266 throw new IOException("Shutting down", t); 267 } 268 } 269 270 @Override 271 public void startRegionServer(String hostname, int port) throws IOException { 272 final Configuration newConf = HBaseConfiguration.create(conf); 273 newConf.setInt(HConstants.REGIONSERVER_PORT, port); 274 startRegionServer(newConf); 275 } 276 277 @Override 278 public void killRegionServer(ServerName serverName) throws IOException { 279 HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); 280 if (server instanceof MiniHBaseClusterRegionServer) { 281 LOG.info("Killing " + server.toString()); 282 ((MiniHBaseClusterRegionServer) server).kill(); 283 } else { 284 abortRegionServer(getRegionServerIndex(serverName)); 285 } 286 } 287 288 @Override 289 public boolean isKilledRS(ServerName serverName) { 290 return MiniHBaseClusterRegionServer.killedServers.contains(serverName); 291 } 292 293 @Override 294 public void stopRegionServer(ServerName serverName) throws IOException { 295 stopRegionServer(getRegionServerIndex(serverName)); 296 } 297 298 @Override 299 public void suspendRegionServer(ServerName serverName) throws IOException { 300 suspendRegionServer(getRegionServerIndex(serverName)); 301 } 302 303 @Override 304 public void resumeRegionServer(ServerName serverName) throws IOException { 305 resumeRegionServer(getRegionServerIndex(serverName)); 306 } 307 308 @Override 309 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 310 //ignore timeout for now 311 waitOnRegionServer(getRegionServerIndex(serverName)); 312 } 313 314 @Override 315 public void startZkNode(String hostname, int port) throws IOException { 316 LOG.warn("Starting zookeeper nodes on mini cluster is not supported"); 317 } 318 319 @Override 320 public void killZkNode(ServerName serverName) throws IOException { 321 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported"); 322 } 323 324 @Override 325 public void stopZkNode(ServerName serverName) throws IOException { 326 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported"); 327 } 328 329 @Override 330 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 331 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported"); 332 } 333 334 @Override 335 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 336 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported"); 337 } 338 339 @Override 340 public void startDataNode(ServerName serverName) throws IOException { 341 LOG.warn("Starting datanodes on mini cluster is not supported"); 342 } 343 344 @Override 345 public void killDataNode(ServerName serverName) throws IOException { 346 LOG.warn("Aborting datanodes on mini cluster is not supported"); 347 } 348 349 @Override 350 public void stopDataNode(ServerName serverName) throws IOException { 351 LOG.warn("Stopping datanodes on mini cluster is not supported"); 352 } 353 354 @Override 355 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 356 LOG.warn("Waiting for datanodes to start on mini cluster is not supported"); 357 } 358 359 @Override 360 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 361 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported"); 362 } 363 364 @Override 365 public void startNameNode(ServerName serverName) throws IOException { 366 LOG.warn("Starting namenodes on mini cluster is not supported"); 367 } 368 369 @Override 370 public void killNameNode(ServerName serverName) throws IOException { 371 LOG.warn("Aborting namenodes on mini cluster is not supported"); 372 } 373 374 @Override 375 public void stopNameNode(ServerName serverName) throws IOException { 376 LOG.warn("Stopping namenodes on mini cluster is not supported"); 377 } 378 379 @Override 380 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 381 LOG.warn("Waiting for namenodes to start on mini cluster is not supported"); 382 } 383 384 @Override 385 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 386 LOG.warn("Waiting for namenodes to stop on mini cluster is not supported"); 387 } 388 389 @Override 390 public void startMaster(String hostname, int port) throws IOException { 391 this.startMaster(); 392 } 393 394 @Override 395 public void killMaster(ServerName serverName) throws IOException { 396 abortMaster(getMasterIndex(serverName)); 397 } 398 399 @Override 400 public void stopMaster(ServerName serverName) throws IOException { 401 stopMaster(getMasterIndex(serverName)); 402 } 403 404 @Override 405 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 406 //ignore timeout for now 407 waitOnMaster(getMasterIndex(serverName)); 408 } 409 410 /** 411 * Starts a region server thread running 412 * 413 * @throws IOException 414 * @return New RegionServerThread 415 */ 416 public JVMClusterUtil.RegionServerThread startRegionServer() 417 throws IOException { 418 final Configuration newConf = HBaseConfiguration.create(conf); 419 return startRegionServer(newConf); 420 } 421 422 private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration) 423 throws IOException { 424 User rsUser = 425 HBaseTestingUtility.getDifferentUser(configuration, ".hfs."+index++); 426 JVMClusterUtil.RegionServerThread t = null; 427 try { 428 t = hbaseCluster.addRegionServer( 429 configuration, hbaseCluster.getRegionServers().size(), rsUser); 430 t.start(); 431 t.waitForServerOnline(); 432 } catch (InterruptedException ie) { 433 throw new IOException("Interrupted adding regionserver to cluster", ie); 434 } 435 return t; 436 } 437 438 /** 439 * Starts a region server thread and waits until its processed by master. Throws an exception 440 * when it can't start a region server or when the region server is not processed by master 441 * within the timeout. 442 * 443 * @return New RegionServerThread 444 */ 445 public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) 446 throws IOException { 447 448 JVMClusterUtil.RegionServerThread t = startRegionServer(); 449 ServerName rsServerName = t.getRegionServer().getServerName(); 450 451 long start = System.currentTimeMillis(); 452 ClusterStatus clusterStatus = getClusterStatus(); 453 while ((System.currentTimeMillis() - start) < timeout) { 454 if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) { 455 return t; 456 } 457 Threads.sleep(100); 458 } 459 if (t.getRegionServer().isOnline()) { 460 throw new IOException("RS: " + rsServerName + " online, but not processed by master"); 461 } else { 462 throw new IOException("RS: " + rsServerName + " is offline"); 463 } 464 } 465 466 /** 467 * Cause a region server to exit doing basic clean up only on its way out. 468 * @param serverNumber Used as index into a list. 469 */ 470 public String abortRegionServer(int serverNumber) { 471 HRegionServer server = getRegionServer(serverNumber); 472 LOG.info("Aborting " + server.toString()); 473 server.abort("Aborting for tests", new Exception("Trace info")); 474 return server.toString(); 475 } 476 477 /** 478 * Shut down the specified region server cleanly 479 * 480 * @param serverNumber Used as index into a list. 481 * @return the region server that was stopped 482 */ 483 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { 484 return stopRegionServer(serverNumber, true); 485 } 486 487 /** 488 * Shut down the specified region server cleanly 489 * 490 * @param serverNumber Used as index into a list. 491 * @param shutdownFS True is we are to shutdown the filesystem as part of this 492 * regionserver's shutdown. Usually we do but you do not want to do this if 493 * you are running multiple regionservers in a test and you shut down one 494 * before end of the test. 495 * @return the region server that was stopped 496 */ 497 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, 498 final boolean shutdownFS) { 499 JVMClusterUtil.RegionServerThread server = 500 hbaseCluster.getRegionServers().get(serverNumber); 501 LOG.info("Stopping " + server.toString()); 502 server.getRegionServer().stop("Stopping rs " + serverNumber); 503 return server; 504 } 505 506 /** 507 * Suspend the specified region server 508 * @param serverNumber Used as index into a list. 509 * @return 510 */ 511 public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) { 512 JVMClusterUtil.RegionServerThread server = 513 hbaseCluster.getRegionServers().get(serverNumber); 514 LOG.info("Suspending {}", server.toString()); 515 server.suspend(); 516 return server; 517 } 518 519 /** 520 * Resume the specified region server 521 * @param serverNumber Used as index into a list. 522 * @return 523 */ 524 public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) { 525 JVMClusterUtil.RegionServerThread server = 526 hbaseCluster.getRegionServers().get(serverNumber); 527 LOG.info("Resuming {}", server.toString()); 528 server.resume(); 529 return server; 530 } 531 532 /** 533 * Wait for the specified region server to stop. Removes this thread from list 534 * of running threads. 535 * @param serverNumber 536 * @return Name of region server that just went down. 537 */ 538 public String waitOnRegionServer(final int serverNumber) { 539 return this.hbaseCluster.waitOnRegionServer(serverNumber); 540 } 541 542 543 /** 544 * Starts a master thread running 545 * 546 * @return New RegionServerThread 547 */ 548 public JVMClusterUtil.MasterThread startMaster() throws IOException { 549 Configuration c = HBaseConfiguration.create(conf); 550 User user = 551 HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++); 552 553 JVMClusterUtil.MasterThread t = null; 554 try { 555 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user); 556 t.start(); 557 } catch (InterruptedException ie) { 558 throw new IOException("Interrupted adding master to cluster", ie); 559 } 560 conf.set(HConstants.MASTER_ADDRS_KEY, 561 hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY)); 562 return t; 563 } 564 565 /** 566 * Returns the current active master, if available. 567 * @return the active HMaster, null if none is active. 568 */ 569 @Override 570 public MasterService.BlockingInterface getMasterAdminService() { 571 return this.hbaseCluster.getActiveMaster().getMasterRpcServices(); 572 } 573 574 /** 575 * Returns the current active master, if available. 576 * @return the active HMaster, null if none is active. 577 */ 578 public HMaster getMaster() { 579 return this.hbaseCluster.getActiveMaster(); 580 } 581 582 /** 583 * Returns the current active master thread, if available. 584 * @return the active MasterThread, null if none is active. 585 */ 586 public MasterThread getMasterThread() { 587 for (MasterThread mt: hbaseCluster.getLiveMasters()) { 588 if (mt.getMaster().isActiveMaster()) { 589 return mt; 590 } 591 } 592 return null; 593 } 594 595 /** 596 * Returns the master at the specified index, if available. 597 * @return the active HMaster, null if none is active. 598 */ 599 public HMaster getMaster(final int serverNumber) { 600 return this.hbaseCluster.getMaster(serverNumber); 601 } 602 603 /** 604 * Cause a master to exit without shutting down entire cluster. 605 * @param serverNumber Used as index into a list. 606 */ 607 public String abortMaster(int serverNumber) { 608 HMaster server = getMaster(serverNumber); 609 LOG.info("Aborting " + server.toString()); 610 server.abort("Aborting for tests", new Exception("Trace info")); 611 return server.toString(); 612 } 613 614 /** 615 * Shut down the specified master cleanly 616 * 617 * @param serverNumber Used as index into a list. 618 * @return the region server that was stopped 619 */ 620 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { 621 return stopMaster(serverNumber, true); 622 } 623 624 /** 625 * Shut down the specified master cleanly 626 * 627 * @param serverNumber Used as index into a list. 628 * @param shutdownFS True is we are to shutdown the filesystem as part of this 629 * master's shutdown. Usually we do but you do not want to do this if 630 * you are running multiple master in a test and you shut down one 631 * before end of the test. 632 * @return the master that was stopped 633 */ 634 public JVMClusterUtil.MasterThread stopMaster(int serverNumber, 635 final boolean shutdownFS) { 636 JVMClusterUtil.MasterThread server = 637 hbaseCluster.getMasters().get(serverNumber); 638 LOG.info("Stopping " + server.toString()); 639 server.getMaster().stop("Stopping master " + serverNumber); 640 return server; 641 } 642 643 /** 644 * Wait for the specified master to stop. Removes this thread from list 645 * of running threads. 646 * @param serverNumber 647 * @return Name of master that just went down. 648 */ 649 public String waitOnMaster(final int serverNumber) { 650 return this.hbaseCluster.waitOnMaster(serverNumber); 651 } 652 653 /** 654 * Blocks until there is an active master and that master has completed 655 * initialization. 656 * 657 * @return true if an active master becomes available. false if there are no 658 * masters left. 659 * @throws InterruptedException 660 */ 661 @Override 662 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 663 List<JVMClusterUtil.MasterThread> mts; 664 long start = System.currentTimeMillis(); 665 while (!(mts = getMasterThreads()).isEmpty() 666 && (System.currentTimeMillis() - start) < timeout) { 667 for (JVMClusterUtil.MasterThread mt : mts) { 668 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { 669 return true; 670 } 671 } 672 673 Threads.sleep(100); 674 } 675 return false; 676 } 677 678 /** 679 * @return List of master threads. 680 */ 681 public List<JVMClusterUtil.MasterThread> getMasterThreads() { 682 return this.hbaseCluster.getMasters(); 683 } 684 685 /** 686 * @return List of live master threads (skips the aborted and the killed) 687 */ 688 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() { 689 return this.hbaseCluster.getLiveMasters(); 690 } 691 692 /** 693 * Wait for Mini HBase Cluster to shut down. 694 */ 695 public void join() { 696 this.hbaseCluster.join(); 697 } 698 699 /** 700 * Shut down the mini HBase cluster 701 */ 702 @Override 703 public void shutdown() throws IOException { 704 if (this.hbaseCluster != null) { 705 this.hbaseCluster.shutdown(); 706 } 707 } 708 709 @Override 710 public void close() throws IOException { 711 } 712 713 /** 714 * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 715 * Use {@link #getClusterMetrics()} instead. 716 */ 717 @Deprecated 718 public ClusterStatus getClusterStatus() throws IOException { 719 HMaster master = getMaster(); 720 return master == null ? null : new ClusterStatus(master.getClusterMetrics()); 721 } 722 723 @Override 724 public ClusterMetrics getClusterMetrics() throws IOException { 725 HMaster master = getMaster(); 726 return master == null ? null : master.getClusterMetrics(); 727 } 728 729 private void executeFlush(HRegion region) throws IOException { 730 // retry 5 times if we can not flush 731 for (int i = 0; i < 5; i++) { 732 FlushResult result = region.flush(true); 733 if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) { 734 return; 735 } 736 Threads.sleep(1000); 737 } 738 } 739 740 /** 741 * Call flushCache on all regions on all participating regionservers. 742 */ 743 public void flushcache() throws IOException { 744 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 745 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 746 executeFlush(r); 747 } 748 } 749 } 750 751 /** 752 * Call flushCache on all regions of the specified table. 753 */ 754 public void flushcache(TableName tableName) throws IOException { 755 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 756 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 757 if (r.getTableDescriptor().getTableName().equals(tableName)) { 758 executeFlush(r); 759 } 760 } 761 } 762 } 763 764 /** 765 * Call flushCache on all regions on all participating regionservers. 766 * @throws IOException 767 */ 768 public void compact(boolean major) throws IOException { 769 for (JVMClusterUtil.RegionServerThread t: 770 this.hbaseCluster.getRegionServers()) { 771 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) { 772 r.compact(major); 773 } 774 } 775 } 776 777 /** 778 * Call flushCache on all regions of the specified table. 779 * @throws IOException 780 */ 781 public void compact(TableName tableName, boolean major) throws IOException { 782 for (JVMClusterUtil.RegionServerThread t: 783 this.hbaseCluster.getRegionServers()) { 784 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) { 785 if(r.getTableDescriptor().getTableName().equals(tableName)) { 786 r.compact(major); 787 } 788 } 789 } 790 } 791 792 /** 793 * @return Number of live region servers in the cluster currently. 794 */ 795 public int getNumLiveRegionServers() { 796 return this.hbaseCluster.getLiveRegionServers().size(); 797 } 798 799 /** 800 * @return List of region server threads. Does not return the master even though it is also 801 * a region server. 802 */ 803 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { 804 return this.hbaseCluster.getRegionServers(); 805 } 806 807 /** 808 * @return List of live region server threads (skips the aborted and the killed) 809 */ 810 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { 811 return this.hbaseCluster.getLiveRegionServers(); 812 } 813 814 /** 815 * Grab a numbered region server of your choice. 816 * @param serverNumber 817 * @return region server 818 */ 819 public HRegionServer getRegionServer(int serverNumber) { 820 return hbaseCluster.getRegionServer(serverNumber); 821 } 822 823 public HRegionServer getRegionServer(ServerName serverName) { 824 return hbaseCluster.getRegionServers().stream() 825 .map(t -> t.getRegionServer()) 826 .filter(r -> r.getServerName().equals(serverName)) 827 .findFirst().orElse(null); 828 } 829 830 public List<HRegion> getRegions(byte[] tableName) { 831 return getRegions(TableName.valueOf(tableName)); 832 } 833 834 public List<HRegion> getRegions(TableName tableName) { 835 List<HRegion> ret = new ArrayList<>(); 836 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 837 HRegionServer hrs = rst.getRegionServer(); 838 for (Region region : hrs.getOnlineRegionsLocalContext()) { 839 if (region.getTableDescriptor().getTableName().equals(tableName)) { 840 ret.add((HRegion)region); 841 } 842 } 843 } 844 return ret; 845 } 846 847 /** 848 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} 849 * of HRS carrying regionName. Returns -1 if none found. 850 */ 851 public int getServerWithMeta() { 852 return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); 853 } 854 855 /** 856 * Get the location of the specified region 857 * @param regionName Name of the region in bytes 858 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} 859 * of HRS carrying hbase:meta. Returns -1 if none found. 860 */ 861 public int getServerWith(byte[] regionName) { 862 int index = -1; 863 int count = 0; 864 for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) { 865 HRegionServer hrs = rst.getRegionServer(); 866 if (!hrs.isStopped()) { 867 Region region = hrs.getOnlineRegion(regionName); 868 if (region != null) { 869 index = count; 870 break; 871 } 872 } 873 count++; 874 } 875 return index; 876 } 877 878 @Override 879 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) 880 throws IOException { 881 // Assume there is only one master thread which is the active master. 882 // If there are multiple master threads, the backup master threads 883 // should hold some regions. Please refer to #countServedRegions 884 // to see how we find out all regions. 885 HMaster master = getMaster(); 886 Region region = master.getOnlineRegion(regionName); 887 if (region != null) { 888 return master.getServerName(); 889 } 890 int index = getServerWith(regionName); 891 if (index < 0) { 892 return null; 893 } 894 return getRegionServer(index).getServerName(); 895 } 896 897 /** 898 * Counts the total numbers of regions being served by the currently online 899 * region servers by asking each how many regions they have. Does not look 900 * at hbase:meta at all. Count includes catalog tables. 901 * @return number of regions being served by all region servers 902 */ 903 public long countServedRegions() { 904 long count = 0; 905 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) { 906 count += rst.getRegionServer().getNumberOfOnlineRegions(); 907 } 908 for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) { 909 count += mt.getMaster().getNumberOfOnlineRegions(); 910 } 911 return count; 912 } 913 914 /** 915 * Do a simulated kill all masters and regionservers. Useful when it is 916 * impossible to bring the mini-cluster back for clean shutdown. 917 */ 918 public void killAll() { 919 // Do backups first. 920 MasterThread activeMaster = null; 921 for (MasterThread masterThread : getMasterThreads()) { 922 if (!masterThread.getMaster().isActiveMaster()) { 923 masterThread.getMaster().abort("killAll"); 924 } else { 925 activeMaster = masterThread; 926 } 927 } 928 // Do active after. 929 if (activeMaster != null) { 930 activeMaster.getMaster().abort("killAll"); 931 } 932 for (RegionServerThread rst : getRegionServerThreads()) { 933 rst.getRegionServer().abort("killAll"); 934 } 935 } 936 937 @Override 938 public void waitUntilShutDown() { 939 this.hbaseCluster.join(); 940 } 941 942 public List<HRegion> findRegionsForTable(TableName tableName) { 943 ArrayList<HRegion> ret = new ArrayList<>(); 944 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 945 HRegionServer hrs = rst.getRegionServer(); 946 for (Region region : hrs.getRegions(tableName)) { 947 if (region.getTableDescriptor().getTableName().equals(tableName)) { 948 ret.add((HRegion)region); 949 } 950 } 951 } 952 return ret; 953 } 954 955 956 protected int getRegionServerIndex(ServerName serverName) { 957 //we have a small number of region servers, this should be fine for now. 958 List<RegionServerThread> servers = getRegionServerThreads(); 959 for (int i=0; i < servers.size(); i++) { 960 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 961 return i; 962 } 963 } 964 return -1; 965 } 966 967 protected int getMasterIndex(ServerName serverName) { 968 List<MasterThread> masters = getMasterThreads(); 969 for (int i = 0; i < masters.size(); i++) { 970 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 971 return i; 972 } 973 } 974 return -1; 975 } 976 977 @Override 978 public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException { 979 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices(); 980 } 981 982 @Override 983 public ClientService.BlockingInterface getClientProtocol(ServerName serverName) 984 throws IOException { 985 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices(); 986 } 987}