001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.security.PrivilegedAction; 023import java.util.ArrayList; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Set; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.hbase.master.HMaster; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.Region; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.test.MetricsAssertHelper; 037import org.apache.hadoop.hbase.util.JVMClusterUtil; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 040import org.apache.hadoop.hbase.util.Threads; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 049 050/** 051 * This class creates a single process HBase cluster. 052 * each server. The master uses the 'default' FileSystem. The RegionServers, 053 * if we are running on DistributedFilesystem, create a FileSystem instance 054 * each and will close down their instance on the way out. 055 */ 056@InterfaceAudience.Public 057public class MiniHBaseCluster extends HBaseCluster { 058 private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName()); 059 public LocalHBaseCluster hbaseCluster; 060 private static int index; 061 062 /** 063 * Start a MiniHBaseCluster. 064 * @param conf Configuration to be used for cluster 065 * @param numRegionServers initial number of region servers to start. 066 * @throws IOException 067 */ 068 public MiniHBaseCluster(Configuration conf, int numRegionServers) 069 throws IOException, InterruptedException { 070 this(conf, 1, numRegionServers); 071 } 072 073 /** 074 * Start a MiniHBaseCluster. 075 * @param conf Configuration to be used for cluster 076 * @param numMasters initial number of masters to start. 077 * @param numRegionServers initial number of region servers to start. 078 * @throws IOException 079 */ 080 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) 081 throws IOException, InterruptedException { 082 this(conf, numMasters, numRegionServers, null, null); 083 } 084 085 /** 086 * Start a MiniHBaseCluster. 087 * @param conf Configuration to be used for cluster 088 * @param numMasters initial number of masters to start. 089 * @param numRegionServers initial number of region servers to start. 090 */ 091 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 092 Class<? extends HMaster> masterClass, 093 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 094 throws IOException, InterruptedException { 095 this(conf, numMasters, numRegionServers, null, masterClass, regionserverClass); 096 } 097 098 /** 099 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster 100 * restart where for sure the regionservers come up on same address+port (but 101 * just with different startcode); by default mini hbase clusters choose new 102 * arbitrary ports on each cluster start. 103 * @throws IOException 104 * @throws InterruptedException 105 */ 106 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 107 List<Integer> rsPorts, 108 Class<? extends HMaster> masterClass, 109 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 110 throws IOException, InterruptedException { 111 super(conf); 112 113 // Hadoop 2 114 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); 115 116 init(numMasters, numRegionServers, rsPorts, masterClass, regionserverClass); 117 this.initialClusterStatus = getClusterStatus(); 118 } 119 120 public Configuration getConfiguration() { 121 return this.conf; 122 } 123 124 /** 125 * Subclass so can get at protected methods (none at moment). Also, creates 126 * a FileSystem instance per instantiation. Adds a shutdown own FileSystem 127 * on the way out. Shuts down own Filesystem only, not All filesystems as 128 * the FileSystem system exit hook does. 129 */ 130 public static class MiniHBaseClusterRegionServer extends HRegionServer { 131 private Thread shutdownThread = null; 132 private User user = null; 133 /** 134 * List of RegionServers killed so far. ServerName also comprises startCode of a server, 135 * so any restarted instances of the same server will have different ServerName and will not 136 * coincide with past dead ones. So there's no need to cleanup this list. 137 */ 138 static Set<ServerName> killedServers = new HashSet<>(); 139 140 public MiniHBaseClusterRegionServer(Configuration conf) 141 throws IOException, InterruptedException { 142 super(conf); 143 this.user = User.getCurrent(); 144 } 145 146 /* 147 * @param c 148 * @param currentfs We return this if we did not make a new one. 149 * @param uniqueName Same name used to help identify the created fs. 150 * @return A new fs instance if we are up on DistributeFileSystem. 151 * @throws IOException 152 */ 153 154 @Override 155 protected void handleReportForDutyResponse( 156 final RegionServerStartupResponse c) throws IOException { 157 super.handleReportForDutyResponse(c); 158 // Run this thread to shutdown our filesystem on way out. 159 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem()); 160 } 161 162 @Override 163 public void run() { 164 try { 165 this.user.runAs(new PrivilegedAction<Object>() { 166 @Override 167 public Object run() { 168 runRegionServer(); 169 return null; 170 } 171 }); 172 } catch (Throwable t) { 173 LOG.error("Exception in run", t); 174 } finally { 175 // Run this on the way out. 176 if (this.shutdownThread != null) { 177 this.shutdownThread.start(); 178 Threads.shutdown(this.shutdownThread, 30000); 179 } 180 } 181 } 182 183 private void runRegionServer() { 184 super.run(); 185 } 186 187 @Override 188 protected void kill() { 189 killedServers.add(getServerName()); 190 super.kill(); 191 } 192 193 @Override 194 public void abort(final String reason, final Throwable cause) { 195 this.user.runAs(new PrivilegedAction<Object>() { 196 @Override 197 public Object run() { 198 abortRegionServer(reason, cause); 199 return null; 200 } 201 }); 202 } 203 204 private void abortRegionServer(String reason, Throwable cause) { 205 super.abort(reason, cause); 206 } 207 } 208 209 /** 210 * Alternate shutdown hook. 211 * Just shuts down the passed fs, not all as default filesystem hook does. 212 */ 213 static class SingleFileSystemShutdownThread extends Thread { 214 private final FileSystem fs; 215 SingleFileSystemShutdownThread(final FileSystem fs) { 216 super("Shutdown of " + fs); 217 this.fs = fs; 218 } 219 @Override 220 public void run() { 221 try { 222 LOG.info("Hook closing fs=" + this.fs); 223 this.fs.close(); 224 } catch (NullPointerException npe) { 225 LOG.debug("Need to fix these: " + npe.toString()); 226 } catch (IOException e) { 227 LOG.warn("Running hook", e); 228 } 229 } 230 } 231 232 private void init(final int nMasterNodes, final int nRegionNodes, List<Integer> rsPorts, 233 Class<? extends HMaster> masterClass, 234 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 235 throws IOException, InterruptedException { 236 try { 237 if (masterClass == null){ 238 masterClass = HMaster.class; 239 } 240 if (regionserverClass == null){ 241 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class; 242 } 243 244 // start up a LocalHBaseCluster 245 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, 246 masterClass, regionserverClass); 247 248 // manually add the regionservers as other users 249 for (int i = 0; i < nRegionNodes; i++) { 250 Configuration rsConf = HBaseConfiguration.create(conf); 251 if (rsPorts != null) { 252 rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); 253 } 254 User user = HBaseTestingUtility.getDifferentUser(rsConf, 255 ".hfs."+index++); 256 hbaseCluster.addRegionServer(rsConf, i, user); 257 } 258 259 hbaseCluster.startup(); 260 } catch (IOException e) { 261 shutdown(); 262 throw e; 263 } catch (Throwable t) { 264 LOG.error("Error starting cluster", t); 265 shutdown(); 266 throw new IOException("Shutting down", t); 267 } 268 } 269 270 @Override 271 public void startRegionServer(String hostname, int port) throws IOException { 272 final Configuration newConf = HBaseConfiguration.create(conf); 273 newConf.setInt(HConstants.REGIONSERVER_PORT, port); 274 startRegionServer(newConf); 275 } 276 277 @Override 278 public void killRegionServer(ServerName serverName) throws IOException { 279 HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); 280 if (server instanceof MiniHBaseClusterRegionServer) { 281 LOG.info("Killing " + server.toString()); 282 ((MiniHBaseClusterRegionServer) server).kill(); 283 } else { 284 abortRegionServer(getRegionServerIndex(serverName)); 285 } 286 } 287 288 @Override 289 public boolean isKilledRS(ServerName serverName) { 290 return MiniHBaseClusterRegionServer.killedServers.contains(serverName); 291 } 292 293 @Override 294 public void stopRegionServer(ServerName serverName) throws IOException { 295 stopRegionServer(getRegionServerIndex(serverName)); 296 } 297 298 @Override 299 public void suspendRegionServer(ServerName serverName) throws IOException { 300 suspendRegionServer(getRegionServerIndex(serverName)); 301 } 302 303 @Override 304 public void resumeRegionServer(ServerName serverName) throws IOException { 305 resumeRegionServer(getRegionServerIndex(serverName)); 306 } 307 308 @Override 309 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 310 //ignore timeout for now 311 waitOnRegionServer(getRegionServerIndex(serverName)); 312 } 313 314 @Override 315 public void startZkNode(String hostname, int port) throws IOException { 316 LOG.warn("Starting zookeeper nodes on mini cluster is not supported"); 317 } 318 319 @Override 320 public void killZkNode(ServerName serverName) throws IOException { 321 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported"); 322 } 323 324 @Override 325 public void stopZkNode(ServerName serverName) throws IOException { 326 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported"); 327 } 328 329 @Override 330 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 331 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported"); 332 } 333 334 @Override 335 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 336 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported"); 337 } 338 339 @Override 340 public void startDataNode(ServerName serverName) throws IOException { 341 LOG.warn("Starting datanodes on mini cluster is not supported"); 342 } 343 344 @Override 345 public void killDataNode(ServerName serverName) throws IOException { 346 LOG.warn("Aborting datanodes on mini cluster is not supported"); 347 } 348 349 @Override 350 public void stopDataNode(ServerName serverName) throws IOException { 351 LOG.warn("Stopping datanodes on mini cluster is not supported"); 352 } 353 354 @Override 355 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 356 LOG.warn("Waiting for datanodes to start on mini cluster is not supported"); 357 } 358 359 @Override 360 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 361 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported"); 362 } 363 364 @Override 365 public void startNameNode(ServerName serverName) throws IOException { 366 LOG.warn("Starting namenodes on mini cluster is not supported"); 367 } 368 369 @Override 370 public void killNameNode(ServerName serverName) throws IOException { 371 LOG.warn("Aborting namenodes on mini cluster is not supported"); 372 } 373 374 @Override 375 public void stopNameNode(ServerName serverName) throws IOException { 376 LOG.warn("Stopping namenodes on mini cluster is not supported"); 377 } 378 379 @Override 380 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 381 LOG.warn("Waiting for namenodes to start on mini cluster is not supported"); 382 } 383 384 @Override 385 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 386 LOG.warn("Waiting for namenodes to stop on mini cluster is not supported"); 387 } 388 389 @Override 390 public void startMaster(String hostname, int port) throws IOException { 391 this.startMaster(); 392 } 393 394 @Override 395 public void killMaster(ServerName serverName) throws IOException { 396 abortMaster(getMasterIndex(serverName)); 397 } 398 399 @Override 400 public void stopMaster(ServerName serverName) throws IOException { 401 stopMaster(getMasterIndex(serverName)); 402 } 403 404 @Override 405 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 406 //ignore timeout for now 407 waitOnMaster(getMasterIndex(serverName)); 408 } 409 410 /** 411 * Starts a region server thread running 412 * 413 * @throws IOException 414 * @return New RegionServerThread 415 */ 416 public JVMClusterUtil.RegionServerThread startRegionServer() 417 throws IOException { 418 final Configuration newConf = HBaseConfiguration.create(conf); 419 return startRegionServer(newConf); 420 } 421 422 private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration) 423 throws IOException { 424 User rsUser = 425 HBaseTestingUtility.getDifferentUser(configuration, ".hfs."+index++); 426 JVMClusterUtil.RegionServerThread t = null; 427 try { 428 t = hbaseCluster.addRegionServer( 429 configuration, hbaseCluster.getRegionServers().size(), rsUser); 430 t.start(); 431 t.waitForServerOnline(); 432 } catch (InterruptedException ie) { 433 throw new IOException("Interrupted adding regionserver to cluster", ie); 434 } 435 return t; 436 } 437 438 /** 439 * Starts a region server thread and waits until its processed by master. Throws an exception 440 * when it can't start a region server or when the region server is not processed by master 441 * within the timeout. 442 * 443 * @return New RegionServerThread 444 */ 445 public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) 446 throws IOException { 447 448 JVMClusterUtil.RegionServerThread t = startRegionServer(); 449 ServerName rsServerName = t.getRegionServer().getServerName(); 450 451 long start = System.currentTimeMillis(); 452 ClusterStatus clusterStatus = getClusterStatus(); 453 while ((System.currentTimeMillis() - start) < timeout) { 454 if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) { 455 return t; 456 } 457 Threads.sleep(100); 458 } 459 if (t.getRegionServer().isOnline()) { 460 throw new IOException("RS: " + rsServerName + " online, but not processed by master"); 461 } else { 462 throw new IOException("RS: " + rsServerName + " is offline"); 463 } 464 } 465 466 /** 467 * Cause a region server to exit doing basic clean up only on its way out. 468 * @param serverNumber Used as index into a list. 469 */ 470 public String abortRegionServer(int serverNumber) { 471 HRegionServer server = getRegionServer(serverNumber); 472 LOG.info("Aborting " + server.toString()); 473 server.abort("Aborting for tests", new Exception("Trace info")); 474 return server.toString(); 475 } 476 477 /** 478 * Shut down the specified region server cleanly 479 * 480 * @param serverNumber Used as index into a list. 481 * @return the region server that was stopped 482 */ 483 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { 484 return stopRegionServer(serverNumber, true); 485 } 486 487 /** 488 * Shut down the specified region server cleanly 489 * 490 * @param serverNumber Used as index into a list. 491 * @param shutdownFS True is we are to shutdown the filesystem as part of this 492 * regionserver's shutdown. Usually we do but you do not want to do this if 493 * you are running multiple regionservers in a test and you shut down one 494 * before end of the test. 495 * @return the region server that was stopped 496 */ 497 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, 498 final boolean shutdownFS) { 499 JVMClusterUtil.RegionServerThread server = 500 hbaseCluster.getRegionServers().get(serverNumber); 501 LOG.info("Stopping " + server.toString()); 502 server.getRegionServer().stop("Stopping rs " + serverNumber); 503 return server; 504 } 505 506 /** 507 * Suspend the specified region server 508 * @param serverNumber Used as index into a list. 509 * @return 510 */ 511 public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) { 512 JVMClusterUtil.RegionServerThread server = 513 hbaseCluster.getRegionServers().get(serverNumber); 514 LOG.info("Suspending {}", server.toString()); 515 server.suspend(); 516 return server; 517 } 518 519 /** 520 * Resume the specified region server 521 * @param serverNumber Used as index into a list. 522 * @return 523 */ 524 public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) { 525 JVMClusterUtil.RegionServerThread server = 526 hbaseCluster.getRegionServers().get(serverNumber); 527 LOG.info("Resuming {}", server.toString()); 528 server.resume(); 529 return server; 530 } 531 532 /** 533 * Wait for the specified region server to stop. Removes this thread from list 534 * of running threads. 535 * @param serverNumber 536 * @return Name of region server that just went down. 537 */ 538 public String waitOnRegionServer(final int serverNumber) { 539 return this.hbaseCluster.waitOnRegionServer(serverNumber); 540 } 541 542 543 /** 544 * Starts a master thread running 545 * 546 * @return New RegionServerThread 547 */ 548 public JVMClusterUtil.MasterThread startMaster() throws IOException { 549 Configuration c = HBaseConfiguration.create(conf); 550 User user = 551 HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++); 552 553 JVMClusterUtil.MasterThread t = null; 554 try { 555 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user); 556 t.start(); 557 } catch (InterruptedException ie) { 558 throw new IOException("Interrupted adding master to cluster", ie); 559 } 560 return t; 561 } 562 563 /** 564 * Returns the current active master, if available. 565 * @return the active HMaster, null if none is active. 566 */ 567 @Override 568 public MasterService.BlockingInterface getMasterAdminService() { 569 return this.hbaseCluster.getActiveMaster().getMasterRpcServices(); 570 } 571 572 /** 573 * Returns the current active master, if available. 574 * @return the active HMaster, null if none is active. 575 */ 576 public HMaster getMaster() { 577 return this.hbaseCluster.getActiveMaster(); 578 } 579 580 /** 581 * Returns the current active master thread, if available. 582 * @return the active MasterThread, null if none is active. 583 */ 584 public MasterThread getMasterThread() { 585 for (MasterThread mt: hbaseCluster.getLiveMasters()) { 586 if (mt.getMaster().isActiveMaster()) { 587 return mt; 588 } 589 } 590 return null; 591 } 592 593 /** 594 * Returns the master at the specified index, if available. 595 * @return the active HMaster, null if none is active. 596 */ 597 public HMaster getMaster(final int serverNumber) { 598 return this.hbaseCluster.getMaster(serverNumber); 599 } 600 601 /** 602 * Cause a master to exit without shutting down entire cluster. 603 * @param serverNumber Used as index into a list. 604 */ 605 public String abortMaster(int serverNumber) { 606 HMaster server = getMaster(serverNumber); 607 LOG.info("Aborting " + server.toString()); 608 server.abort("Aborting for tests", new Exception("Trace info")); 609 return server.toString(); 610 } 611 612 /** 613 * Shut down the specified master cleanly 614 * 615 * @param serverNumber Used as index into a list. 616 * @return the region server that was stopped 617 */ 618 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { 619 return stopMaster(serverNumber, true); 620 } 621 622 /** 623 * Shut down the specified master cleanly 624 * 625 * @param serverNumber Used as index into a list. 626 * @param shutdownFS True is we are to shutdown the filesystem as part of this 627 * master's shutdown. Usually we do but you do not want to do this if 628 * you are running multiple master in a test and you shut down one 629 * before end of the test. 630 * @return the master that was stopped 631 */ 632 public JVMClusterUtil.MasterThread stopMaster(int serverNumber, 633 final boolean shutdownFS) { 634 JVMClusterUtil.MasterThread server = 635 hbaseCluster.getMasters().get(serverNumber); 636 LOG.info("Stopping " + server.toString()); 637 server.getMaster().stop("Stopping master " + serverNumber); 638 return server; 639 } 640 641 /** 642 * Wait for the specified master to stop. Removes this thread from list 643 * of running threads. 644 * @param serverNumber 645 * @return Name of master that just went down. 646 */ 647 public String waitOnMaster(final int serverNumber) { 648 return this.hbaseCluster.waitOnMaster(serverNumber); 649 } 650 651 /** 652 * Blocks until there is an active master and that master has completed 653 * initialization. 654 * 655 * @return true if an active master becomes available. false if there are no 656 * masters left. 657 * @throws InterruptedException 658 */ 659 @Override 660 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 661 List<JVMClusterUtil.MasterThread> mts; 662 long start = System.currentTimeMillis(); 663 while (!(mts = getMasterThreads()).isEmpty() 664 && (System.currentTimeMillis() - start) < timeout) { 665 for (JVMClusterUtil.MasterThread mt : mts) { 666 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { 667 return true; 668 } 669 } 670 671 Threads.sleep(100); 672 } 673 return false; 674 } 675 676 /** 677 * @return List of master threads. 678 */ 679 public List<JVMClusterUtil.MasterThread> getMasterThreads() { 680 return this.hbaseCluster.getMasters(); 681 } 682 683 /** 684 * @return List of live master threads (skips the aborted and the killed) 685 */ 686 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() { 687 return this.hbaseCluster.getLiveMasters(); 688 } 689 690 /** 691 * Wait for Mini HBase Cluster to shut down. 692 */ 693 public void join() { 694 this.hbaseCluster.join(); 695 } 696 697 /** 698 * Shut down the mini HBase cluster 699 */ 700 @Override 701 public void shutdown() throws IOException { 702 if (this.hbaseCluster != null) { 703 this.hbaseCluster.shutdown(); 704 } 705 } 706 707 @Override 708 public void close() throws IOException { 709 } 710 711 /** 712 * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 713 * Use {@link #getClusterMetrics()} instead. 714 */ 715 @Deprecated 716 public ClusterStatus getClusterStatus() throws IOException { 717 HMaster master = getMaster(); 718 return master == null ? null : new ClusterStatus(master.getClusterMetrics()); 719 } 720 721 @Override 722 public ClusterMetrics getClusterMetrics() throws IOException { 723 HMaster master = getMaster(); 724 return master == null ? null : master.getClusterMetrics(); 725 } 726 727 private void executeFlush(HRegion region) throws IOException { 728 // retry 5 times if we can not flush 729 for (int i = 0; i < 5; i++) { 730 FlushResult result = region.flush(true); 731 if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) { 732 return; 733 } 734 Threads.sleep(1000); 735 } 736 } 737 738 /** 739 * Call flushCache on all regions on all participating regionservers. 740 */ 741 public void flushcache() throws IOException { 742 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 743 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 744 executeFlush(r); 745 } 746 } 747 } 748 749 /** 750 * Call flushCache on all regions of the specified table. 751 */ 752 public void flushcache(TableName tableName) throws IOException { 753 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 754 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 755 if (r.getTableDescriptor().getTableName().equals(tableName)) { 756 executeFlush(r); 757 } 758 } 759 } 760 } 761 762 /** 763 * Call flushCache on all regions on all participating regionservers. 764 * @throws IOException 765 */ 766 public void compact(boolean major) throws IOException { 767 for (JVMClusterUtil.RegionServerThread t: 768 this.hbaseCluster.getRegionServers()) { 769 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) { 770 r.compact(major); 771 } 772 } 773 } 774 775 /** 776 * Call flushCache on all regions of the specified table. 777 * @throws IOException 778 */ 779 public void compact(TableName tableName, boolean major) throws IOException { 780 for (JVMClusterUtil.RegionServerThread t: 781 this.hbaseCluster.getRegionServers()) { 782 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) { 783 if(r.getTableDescriptor().getTableName().equals(tableName)) { 784 r.compact(major); 785 } 786 } 787 } 788 } 789 790 /** 791 * @return Number of live region servers in the cluster currently. 792 */ 793 public int getNumLiveRegionServers() { 794 return this.hbaseCluster.getLiveRegionServers().size(); 795 } 796 797 /** 798 * @return List of region server threads. Does not return the master even though it is also 799 * a region server. 800 */ 801 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { 802 return this.hbaseCluster.getRegionServers(); 803 } 804 805 /** 806 * @return List of live region server threads (skips the aborted and the killed) 807 */ 808 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { 809 return this.hbaseCluster.getLiveRegionServers(); 810 } 811 812 /** 813 * Grab a numbered region server of your choice. 814 * @param serverNumber 815 * @return region server 816 */ 817 public HRegionServer getRegionServer(int serverNumber) { 818 return hbaseCluster.getRegionServer(serverNumber); 819 } 820 821 public HRegionServer getRegionServer(ServerName serverName) { 822 return hbaseCluster.getRegionServers().stream() 823 .map(t -> t.getRegionServer()) 824 .filter(r -> r.getServerName().equals(serverName)) 825 .findFirst().orElse(null); 826 } 827 828 public List<HRegion> getRegions(byte[] tableName) { 829 return getRegions(TableName.valueOf(tableName)); 830 } 831 832 public List<HRegion> getRegions(TableName tableName) { 833 List<HRegion> ret = new ArrayList<>(); 834 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 835 HRegionServer hrs = rst.getRegionServer(); 836 for (Region region : hrs.getOnlineRegionsLocalContext()) { 837 if (region.getTableDescriptor().getTableName().equals(tableName)) { 838 ret.add((HRegion)region); 839 } 840 } 841 } 842 return ret; 843 } 844 845 /** 846 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} 847 * of HRS carrying regionName. Returns -1 if none found. 848 */ 849 public int getServerWithMeta() { 850 return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); 851 } 852 853 /** 854 * Get the location of the specified region 855 * @param regionName Name of the region in bytes 856 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} 857 * of HRS carrying hbase:meta. Returns -1 if none found. 858 */ 859 public int getServerWith(byte[] regionName) { 860 int index = -1; 861 int count = 0; 862 for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) { 863 HRegionServer hrs = rst.getRegionServer(); 864 if (!hrs.isStopped()) { 865 Region region = hrs.getOnlineRegion(regionName); 866 if (region != null) { 867 index = count; 868 break; 869 } 870 } 871 count++; 872 } 873 return index; 874 } 875 876 @Override 877 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) 878 throws IOException { 879 // Assume there is only one master thread which is the active master. 880 // If there are multiple master threads, the backup master threads 881 // should hold some regions. Please refer to #countServedRegions 882 // to see how we find out all regions. 883 HMaster master = getMaster(); 884 Region region = master.getOnlineRegion(regionName); 885 if (region != null) { 886 return master.getServerName(); 887 } 888 int index = getServerWith(regionName); 889 if (index < 0) { 890 return null; 891 } 892 return getRegionServer(index).getServerName(); 893 } 894 895 /** 896 * Counts the total numbers of regions being served by the currently online 897 * region servers by asking each how many regions they have. Does not look 898 * at hbase:meta at all. Count includes catalog tables. 899 * @return number of regions being served by all region servers 900 */ 901 public long countServedRegions() { 902 long count = 0; 903 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) { 904 count += rst.getRegionServer().getNumberOfOnlineRegions(); 905 } 906 for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) { 907 count += mt.getMaster().getNumberOfOnlineRegions(); 908 } 909 return count; 910 } 911 912 /** 913 * Do a simulated kill all masters and regionservers. Useful when it is 914 * impossible to bring the mini-cluster back for clean shutdown. 915 */ 916 public void killAll() { 917 // Do backups first. 918 MasterThread activeMaster = null; 919 for (MasterThread masterThread : getMasterThreads()) { 920 if (!masterThread.getMaster().isActiveMaster()) { 921 masterThread.getMaster().abort("killAll"); 922 } else { 923 activeMaster = masterThread; 924 } 925 } 926 // Do active after. 927 if (activeMaster != null) { 928 activeMaster.getMaster().abort("killAll"); 929 } 930 for (RegionServerThread rst : getRegionServerThreads()) { 931 rst.getRegionServer().abort("killAll"); 932 } 933 } 934 935 @Override 936 public void waitUntilShutDown() { 937 this.hbaseCluster.join(); 938 } 939 940 public List<HRegion> findRegionsForTable(TableName tableName) { 941 ArrayList<HRegion> ret = new ArrayList<>(); 942 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 943 HRegionServer hrs = rst.getRegionServer(); 944 for (Region region : hrs.getRegions(tableName)) { 945 if (region.getTableDescriptor().getTableName().equals(tableName)) { 946 ret.add((HRegion)region); 947 } 948 } 949 } 950 return ret; 951 } 952 953 954 protected int getRegionServerIndex(ServerName serverName) { 955 //we have a small number of region servers, this should be fine for now. 956 List<RegionServerThread> servers = getRegionServerThreads(); 957 for (int i=0; i < servers.size(); i++) { 958 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 959 return i; 960 } 961 } 962 return -1; 963 } 964 965 protected int getMasterIndex(ServerName serverName) { 966 List<MasterThread> masters = getMasterThreads(); 967 for (int i = 0; i < masters.size(); i++) { 968 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 969 return i; 970 } 971 } 972 return -1; 973 } 974 975 @Override 976 public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException { 977 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices(); 978 } 979 980 @Override 981 public ClientService.BlockingInterface getClientProtocol(ServerName serverName) 982 throws IOException { 983 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices(); 984 } 985}