001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedAction; 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.hbase.client.RegionReplicaUtil; 029import org.apache.hadoop.hbase.master.HMaster; 030import org.apache.hadoop.hbase.regionserver.HRegion; 031import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 032import org.apache.hadoop.hbase.regionserver.HRegionServer; 033import org.apache.hadoop.hbase.regionserver.Region; 034import org.apache.hadoop.hbase.security.User; 035import org.apache.hadoop.hbase.test.MetricsAssertHelper; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.JVMClusterUtil; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 040import org.apache.hadoop.hbase.util.Threads; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 049 050/** 051 * This class creates a single process HBase cluster. each server. The master uses the 'default' 052 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem 053 * instance each and will close down their instance on the way out. 054 */ 055@InterfaceAudience.Public 056public class MiniHBaseCluster extends HBaseCluster { 057 private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName()); 058 public LocalHBaseCluster hbaseCluster; 059 private static int index; 060 061 /** 062 * Start a MiniHBaseCluster. 063 * @param conf Configuration to be used for cluster 064 * @param numRegionServers initial number of region servers to start. n 065 */ 066 public MiniHBaseCluster(Configuration conf, int numRegionServers) 067 throws IOException, InterruptedException { 068 this(conf, 1, numRegionServers); 069 } 070 071 /** 072 * Start a MiniHBaseCluster. 073 * @param conf Configuration to be used for cluster 074 * @param numMasters initial number of masters to start. 075 * @param numRegionServers initial number of region servers to start. n 076 */ 077 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) 078 throws IOException, InterruptedException { 079 this(conf, numMasters, numRegionServers, null, null); 080 } 081 082 /** 083 * Start a MiniHBaseCluster. 084 * @param conf Configuration to be used for cluster 085 * @param numMasters initial number of masters to start. 086 * @param numRegionServers initial number of region servers to start. 087 */ 088 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 089 Class<? extends HMaster> masterClass, 090 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 091 throws IOException, InterruptedException { 092 this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass); 093 } 094 095 /** 096 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster 097 * restart where for sure the regionservers come up on same address+port (but just 098 * with different startcode); by default mini hbase clusters choose new arbitrary 099 * ports on each cluster start. nn 100 */ 101 public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters, 102 int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 103 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 104 throws IOException, InterruptedException { 105 super(conf); 106 107 // Hadoop 2 108 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); 109 110 init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass, 111 regionserverClass); 112 this.initialClusterStatus = getClusterMetrics(); 113 } 114 115 public Configuration getConfiguration() { 116 return this.conf; 117 } 118 119 /** 120 * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance 121 * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem 122 * only, not All filesystems as the FileSystem system exit hook does. 123 */ 124 public static class MiniHBaseClusterRegionServer extends HRegionServer { 125 private Thread shutdownThread = null; 126 private User user = null; 127 /** 128 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any 129 * restarted instances of the same server will have different ServerName and will not coincide 130 * with past dead ones. So there's no need to cleanup this list. 131 */ 132 static Set<ServerName> killedServers = new HashSet<>(); 133 134 public MiniHBaseClusterRegionServer(Configuration conf) 135 throws IOException, InterruptedException { 136 super(conf); 137 this.user = User.getCurrent(); 138 } 139 140 /* 141 * n * @param currentfs We return this if we did not make a new one. 142 * @param uniqueName Same name used to help identify the created fs. 143 * @return A new fs instance if we are up on DistributeFileSystem. n 144 */ 145 146 @Override 147 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 148 throws IOException { 149 super.handleReportForDutyResponse(c); 150 // Run this thread to shutdown our filesystem on way out. 151 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem()); 152 } 153 154 @Override 155 public void run() { 156 try { 157 this.user.runAs(new PrivilegedAction<Object>() { 158 @Override 159 public Object run() { 160 runRegionServer(); 161 return null; 162 } 163 }); 164 } catch (Throwable t) { 165 LOG.error("Exception in run", t); 166 } finally { 167 // Run this on the way out. 168 if (this.shutdownThread != null) { 169 this.shutdownThread.start(); 170 Threads.shutdown(this.shutdownThread, 30000); 171 } 172 } 173 } 174 175 private void runRegionServer() { 176 super.run(); 177 } 178 179 @Override 180 protected void kill() { 181 killedServers.add(getServerName()); 182 super.kill(); 183 } 184 185 @Override 186 public void abort(final String reason, final Throwable cause) { 187 this.user.runAs(new PrivilegedAction<Object>() { 188 @Override 189 public Object run() { 190 abortRegionServer(reason, cause); 191 return null; 192 } 193 }); 194 } 195 196 private void abortRegionServer(String reason, Throwable cause) { 197 super.abort(reason, cause); 198 } 199 } 200 201 /** 202 * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook 203 * does. 204 */ 205 static class SingleFileSystemShutdownThread extends Thread { 206 private final FileSystem fs; 207 208 SingleFileSystemShutdownThread(final FileSystem fs) { 209 super("Shutdown of " + fs); 210 this.fs = fs; 211 } 212 213 @Override 214 public void run() { 215 try { 216 LOG.info("Hook closing fs=" + this.fs); 217 this.fs.close(); 218 } catch (NullPointerException npe) { 219 LOG.debug("Need to fix these: " + npe.toString()); 220 } catch (IOException e) { 221 LOG.warn("Running hook", e); 222 } 223 } 224 } 225 226 private void init(final int nMasterNodes, final int numAlwaysStandByMasters, 227 final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 228 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 229 throws IOException, InterruptedException { 230 try { 231 if (masterClass == null) { 232 masterClass = HMaster.class; 233 } 234 if (regionserverClass == null) { 235 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class; 236 } 237 238 // start up a LocalHBaseCluster 239 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0, 240 masterClass, regionserverClass); 241 242 // manually add the regionservers as other users 243 for (int i = 0; i < nRegionNodes; i++) { 244 Configuration rsConf = HBaseConfiguration.create(conf); 245 if (rsPorts != null) { 246 rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); 247 } 248 User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++); 249 hbaseCluster.addRegionServer(rsConf, i, user); 250 } 251 252 hbaseCluster.startup(); 253 } catch (IOException e) { 254 shutdown(); 255 throw e; 256 } catch (Throwable t) { 257 LOG.error("Error starting cluster", t); 258 shutdown(); 259 throw new IOException("Shutting down", t); 260 } 261 } 262 263 @Override 264 public void startRegionServer(String hostname, int port) throws IOException { 265 final Configuration newConf = HBaseConfiguration.create(conf); 266 newConf.setInt(HConstants.REGIONSERVER_PORT, port); 267 startRegionServer(newConf); 268 } 269 270 @Override 271 public void killRegionServer(ServerName serverName) throws IOException { 272 HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); 273 if (server instanceof MiniHBaseClusterRegionServer) { 274 LOG.info("Killing " + server.toString()); 275 ((MiniHBaseClusterRegionServer) server).kill(); 276 } else { 277 abortRegionServer(getRegionServerIndex(serverName)); 278 } 279 } 280 281 @Override 282 public boolean isKilledRS(ServerName serverName) { 283 return MiniHBaseClusterRegionServer.killedServers.contains(serverName); 284 } 285 286 @Override 287 public void stopRegionServer(ServerName serverName) throws IOException { 288 stopRegionServer(getRegionServerIndex(serverName)); 289 } 290 291 @Override 292 public void suspendRegionServer(ServerName serverName) throws IOException { 293 suspendRegionServer(getRegionServerIndex(serverName)); 294 } 295 296 @Override 297 public void resumeRegionServer(ServerName serverName) throws IOException { 298 resumeRegionServer(getRegionServerIndex(serverName)); 299 } 300 301 @Override 302 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 303 // ignore timeout for now 304 waitOnRegionServer(getRegionServerIndex(serverName)); 305 } 306 307 @Override 308 public void startZkNode(String hostname, int port) throws IOException { 309 LOG.warn("Starting zookeeper nodes on mini cluster is not supported"); 310 } 311 312 @Override 313 public void killZkNode(ServerName serverName) throws IOException { 314 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported"); 315 } 316 317 @Override 318 public void stopZkNode(ServerName serverName) throws IOException { 319 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported"); 320 } 321 322 @Override 323 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 324 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported"); 325 } 326 327 @Override 328 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 329 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported"); 330 } 331 332 @Override 333 public void startDataNode(ServerName serverName) throws IOException { 334 LOG.warn("Starting datanodes on mini cluster is not supported"); 335 } 336 337 @Override 338 public void killDataNode(ServerName serverName) throws IOException { 339 LOG.warn("Aborting datanodes on mini cluster is not supported"); 340 } 341 342 @Override 343 public void stopDataNode(ServerName serverName) throws IOException { 344 LOG.warn("Stopping datanodes on mini cluster is not supported"); 345 } 346 347 @Override 348 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 349 LOG.warn("Waiting for datanodes to start on mini cluster is not supported"); 350 } 351 352 @Override 353 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 354 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported"); 355 } 356 357 @Override 358 public void startNameNode(ServerName serverName) throws IOException { 359 LOG.warn("Starting namenodes on mini cluster is not supported"); 360 } 361 362 @Override 363 public void killNameNode(ServerName serverName) throws IOException { 364 LOG.warn("Aborting namenodes on mini cluster is not supported"); 365 } 366 367 @Override 368 public void stopNameNode(ServerName serverName) throws IOException { 369 LOG.warn("Stopping namenodes on mini cluster is not supported"); 370 } 371 372 @Override 373 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 374 LOG.warn("Waiting for namenodes to start on mini cluster is not supported"); 375 } 376 377 @Override 378 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 379 LOG.warn("Waiting for namenodes to stop on mini cluster is not supported"); 380 } 381 382 @Override 383 public void startMaster(String hostname, int port) throws IOException { 384 this.startMaster(); 385 } 386 387 @Override 388 public void killMaster(ServerName serverName) throws IOException { 389 abortMaster(getMasterIndex(serverName)); 390 } 391 392 @Override 393 public void stopMaster(ServerName serverName) throws IOException { 394 stopMaster(getMasterIndex(serverName)); 395 } 396 397 @Override 398 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 399 // ignore timeout for now 400 waitOnMaster(getMasterIndex(serverName)); 401 } 402 403 /** 404 * Starts a region server thread running n * @return New RegionServerThread 405 */ 406 public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException { 407 final Configuration newConf = HBaseConfiguration.create(conf); 408 return startRegionServer(newConf); 409 } 410 411 private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration) 412 throws IOException { 413 User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++); 414 JVMClusterUtil.RegionServerThread t = null; 415 try { 416 t = 417 hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser); 418 t.start(); 419 t.waitForServerOnline(); 420 } catch (InterruptedException ie) { 421 throw new IOException("Interrupted adding regionserver to cluster", ie); 422 } 423 return t; 424 } 425 426 /** 427 * Starts a region server thread and waits until its processed by master. Throws an exception when 428 * it can't start a region server or when the region server is not processed by master within the 429 * timeout. 430 * @return New RegionServerThread 431 */ 432 public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) 433 throws IOException { 434 435 JVMClusterUtil.RegionServerThread t = startRegionServer(); 436 ServerName rsServerName = t.getRegionServer().getServerName(); 437 438 long start = EnvironmentEdgeManager.currentTime(); 439 ClusterStatus clusterStatus = getClusterStatus(); 440 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 441 if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) { 442 return t; 443 } 444 Threads.sleep(100); 445 } 446 if (t.getRegionServer().isOnline()) { 447 throw new IOException("RS: " + rsServerName + " online, but not processed by master"); 448 } else { 449 throw new IOException("RS: " + rsServerName + " is offline"); 450 } 451 } 452 453 /** 454 * Cause a region server to exit doing basic clean up only on its way out. 455 * @param serverNumber Used as index into a list. 456 */ 457 public String abortRegionServer(int serverNumber) { 458 HRegionServer server = getRegionServer(serverNumber); 459 LOG.info("Aborting " + server.toString()); 460 server.abort("Aborting for tests", new Exception("Trace info")); 461 return server.toString(); 462 } 463 464 /** 465 * Shut down the specified region server cleanly 466 * @param serverNumber Used as index into a list. 467 * @return the region server that was stopped 468 */ 469 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { 470 return stopRegionServer(serverNumber, true); 471 } 472 473 /** 474 * Shut down the specified region server cleanly 475 * @param serverNumber Used as index into a list. 476 * @param shutdownFS True is we are to shutdown the filesystem as part of this regionserver's 477 * shutdown. Usually we do but you do not want to do this if you are running 478 * multiple regionservers in a test and you shut down one before end of the 479 * test. 480 * @return the region server that was stopped 481 */ 482 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, 483 final boolean shutdownFS) { 484 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 485 LOG.info("Stopping " + server.toString()); 486 server.getRegionServer().stop("Stopping rs " + serverNumber); 487 return server; 488 } 489 490 /** 491 * Suspend the specified region server 492 * @param serverNumber Used as index into a list. n 493 */ 494 public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) { 495 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 496 LOG.info("Suspending {}", server.toString()); 497 server.suspend(); 498 return server; 499 } 500 501 /** 502 * Resume the specified region server 503 * @param serverNumber Used as index into a list. n 504 */ 505 public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) { 506 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 507 LOG.info("Resuming {}", server.toString()); 508 server.resume(); 509 return server; 510 } 511 512 /** 513 * Wait for the specified region server to stop. Removes this thread from list of running threads. 514 * n * @return Name of region server that just went down. 515 */ 516 public String waitOnRegionServer(final int serverNumber) { 517 return this.hbaseCluster.waitOnRegionServer(serverNumber); 518 } 519 520 /** 521 * Starts a master thread running 522 * @return New RegionServerThread 523 */ 524 public JVMClusterUtil.MasterThread startMaster() throws IOException { 525 Configuration c = HBaseConfiguration.create(conf); 526 User user = HBaseTestingUtility.getDifferentUser(c, ".hfs." + index++); 527 528 JVMClusterUtil.MasterThread t = null; 529 try { 530 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user); 531 t.start(); 532 } catch (InterruptedException ie) { 533 throw new IOException("Interrupted adding master to cluster", ie); 534 } 535 conf.set(HConstants.MASTER_ADDRS_KEY, 536 hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY)); 537 return t; 538 } 539 540 /** 541 * Returns the current active master, if available. 542 * @return the active HMaster, null if none is active. 543 */ 544 @Override 545 public MasterService.BlockingInterface getMasterAdminService() { 546 return this.hbaseCluster.getActiveMaster().getMasterRpcServices(); 547 } 548 549 /** 550 * Returns the current active master, if available. 551 * @return the active HMaster, null if none is active. 552 */ 553 public HMaster getMaster() { 554 return this.hbaseCluster.getActiveMaster(); 555 } 556 557 /** 558 * Returns the current active master thread, if available. 559 * @return the active MasterThread, null if none is active. 560 */ 561 public MasterThread getMasterThread() { 562 for (MasterThread mt : hbaseCluster.getLiveMasters()) { 563 if (mt.getMaster().isActiveMaster()) { 564 return mt; 565 } 566 } 567 return null; 568 } 569 570 /** 571 * Returns the master at the specified index, if available. 572 * @return the active HMaster, null if none is active. 573 */ 574 public HMaster getMaster(final int serverNumber) { 575 return this.hbaseCluster.getMaster(serverNumber); 576 } 577 578 /** 579 * Cause a master to exit without shutting down entire cluster. 580 * @param serverNumber Used as index into a list. 581 */ 582 public String abortMaster(int serverNumber) { 583 HMaster server = getMaster(serverNumber); 584 LOG.info("Aborting " + server.toString()); 585 server.abort("Aborting for tests", new Exception("Trace info")); 586 return server.toString(); 587 } 588 589 /** 590 * Shut down the specified master cleanly 591 * @param serverNumber Used as index into a list. 592 * @return the region server that was stopped 593 */ 594 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { 595 return stopMaster(serverNumber, true); 596 } 597 598 /** 599 * Shut down the specified master cleanly 600 * @param serverNumber Used as index into a list. 601 * @param shutdownFS True is we are to shutdown the filesystem as part of this master's 602 * shutdown. Usually we do but you do not want to do this if you are running 603 * multiple master in a test and you shut down one before end of the test. 604 * @return the master that was stopped 605 */ 606 public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) { 607 JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber); 608 LOG.info("Stopping " + server.toString()); 609 server.getMaster().stop("Stopping master " + serverNumber); 610 return server; 611 } 612 613 /** 614 * Wait for the specified master to stop. Removes this thread from list of running threads. n 615 * * @return Name of master that just went down. 616 */ 617 public String waitOnMaster(final int serverNumber) { 618 return this.hbaseCluster.waitOnMaster(serverNumber); 619 } 620 621 /** 622 * Blocks until there is an active master and that master has completed initialization. 623 * @return true if an active master becomes available. false if there are no masters left. n 624 */ 625 @Override 626 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 627 List<JVMClusterUtil.MasterThread> mts; 628 long start = EnvironmentEdgeManager.currentTime(); 629 while ( 630 !(mts = getMasterThreads()).isEmpty() 631 && (EnvironmentEdgeManager.currentTime() - start) < timeout 632 ) { 633 for (JVMClusterUtil.MasterThread mt : mts) { 634 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { 635 return true; 636 } 637 } 638 639 Threads.sleep(100); 640 } 641 return false; 642 } 643 644 /** Returns List of master threads. */ 645 public List<JVMClusterUtil.MasterThread> getMasterThreads() { 646 return this.hbaseCluster.getMasters(); 647 } 648 649 /** Returns List of live master threads (skips the aborted and the killed) */ 650 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() { 651 return this.hbaseCluster.getLiveMasters(); 652 } 653 654 /** 655 * Wait for Mini HBase Cluster to shut down. 656 */ 657 public void join() { 658 this.hbaseCluster.join(); 659 } 660 661 /** 662 * Shut down the mini HBase cluster 663 */ 664 @Override 665 public void shutdown() throws IOException { 666 if (this.hbaseCluster != null) { 667 this.hbaseCluster.shutdown(); 668 } 669 } 670 671 @Override 672 public void close() throws IOException { 673 } 674 675 /** 676 * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 Use 677 * {@link #getClusterMetrics()} instead. 678 */ 679 @Deprecated 680 public ClusterStatus getClusterStatus() throws IOException { 681 HMaster master = getMaster(); 682 return master == null ? null : new ClusterStatus(master.getClusterMetrics()); 683 } 684 685 @Override 686 public ClusterMetrics getClusterMetrics() throws IOException { 687 HMaster master = getMaster(); 688 return master == null ? null : master.getClusterMetrics(); 689 } 690 691 private void executeFlush(HRegion region) throws IOException { 692 if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 693 return; 694 } 695 // retry 5 times if we can not flush 696 for (int i = 0; i < 5; i++) { 697 FlushResult result = region.flush(true); 698 if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) { 699 return; 700 } 701 Threads.sleep(1000); 702 } 703 } 704 705 /** 706 * Call flushCache on all regions on all participating regionservers. 707 */ 708 public void flushcache() throws IOException { 709 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 710 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 711 executeFlush(r); 712 } 713 } 714 } 715 716 /** 717 * Call flushCache on all regions of the specified table. 718 */ 719 public void flushcache(TableName tableName) throws IOException { 720 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 721 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 722 if (r.getTableDescriptor().getTableName().equals(tableName)) { 723 executeFlush(r); 724 } 725 } 726 } 727 } 728 729 /** 730 * Call flushCache on all regions on all participating regionservers. n 731 */ 732 public void compact(boolean major) throws IOException { 733 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 734 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 735 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 736 r.compact(major); 737 } 738 } 739 } 740 } 741 742 /** 743 * Call flushCache on all regions of the specified table. n 744 */ 745 public void compact(TableName tableName, boolean major) throws IOException { 746 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 747 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 748 if (r.getTableDescriptor().getTableName().equals(tableName)) { 749 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 750 r.compact(major); 751 } 752 } 753 } 754 } 755 } 756 757 /** Returns Number of live region servers in the cluster currently. */ 758 public int getNumLiveRegionServers() { 759 return this.hbaseCluster.getLiveRegionServers().size(); 760 } 761 762 /** 763 * @return List of region server threads. Does not return the master even though it is also a 764 * region server. 765 */ 766 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { 767 return this.hbaseCluster.getRegionServers(); 768 } 769 770 /** Returns List of live region server threads (skips the aborted and the killed) */ 771 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { 772 return this.hbaseCluster.getLiveRegionServers(); 773 } 774 775 /** 776 * Grab a numbered region server of your choice. n * @return region server 777 */ 778 public HRegionServer getRegionServer(int serverNumber) { 779 return hbaseCluster.getRegionServer(serverNumber); 780 } 781 782 public HRegionServer getRegionServer(ServerName serverName) { 783 return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer()) 784 .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null); 785 } 786 787 public List<HRegion> getRegions(byte[] tableName) { 788 return getRegions(TableName.valueOf(tableName)); 789 } 790 791 public List<HRegion> getRegions(TableName tableName) { 792 List<HRegion> ret = new ArrayList<>(); 793 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 794 HRegionServer hrs = rst.getRegionServer(); 795 for (Region region : hrs.getOnlineRegionsLocalContext()) { 796 if (region.getTableDescriptor().getTableName().equals(tableName)) { 797 ret.add((HRegion) region); 798 } 799 } 800 } 801 return ret; 802 } 803 804 /** 805 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying 806 * regionName. Returns -1 if none found. 807 */ 808 public int getServerWithMeta() { 809 return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); 810 } 811 812 /** 813 * Get the location of the specified region 814 * @param regionName Name of the region in bytes 815 * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying 816 * hbase:meta. Returns -1 if none found. 817 */ 818 public int getServerWith(byte[] regionName) { 819 int index = -1; 820 int count = 0; 821 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 822 HRegionServer hrs = rst.getRegionServer(); 823 if (!hrs.isStopped()) { 824 Region region = hrs.getOnlineRegion(regionName); 825 if (region != null) { 826 index = count; 827 break; 828 } 829 } 830 count++; 831 } 832 return index; 833 } 834 835 @Override 836 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) 837 throws IOException { 838 // Assume there is only one master thread which is the active master. 839 // If there are multiple master threads, the backup master threads 840 // should hold some regions. Please refer to #countServedRegions 841 // to see how we find out all regions. 842 HMaster master = getMaster(); 843 Region region = master.getOnlineRegion(regionName); 844 if (region != null) { 845 return master.getServerName(); 846 } 847 int index = getServerWith(regionName); 848 if (index < 0) { 849 return null; 850 } 851 return getRegionServer(index).getServerName(); 852 } 853 854 /** 855 * Counts the total numbers of regions being served by the currently online region servers by 856 * asking each how many regions they have. Does not look at hbase:meta at all. Count includes 857 * catalog tables. 858 * @return number of regions being served by all region servers 859 */ 860 public long countServedRegions() { 861 long count = 0; 862 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) { 863 count += rst.getRegionServer().getNumberOfOnlineRegions(); 864 } 865 for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) { 866 count += mt.getMaster().getNumberOfOnlineRegions(); 867 } 868 return count; 869 } 870 871 /** 872 * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the 873 * mini-cluster back for clean shutdown. 874 */ 875 public void killAll() { 876 // Do backups first. 877 MasterThread activeMaster = null; 878 for (MasterThread masterThread : getMasterThreads()) { 879 if (!masterThread.getMaster().isActiveMaster()) { 880 masterThread.getMaster().abort("killAll"); 881 } else { 882 activeMaster = masterThread; 883 } 884 } 885 // Do active after. 886 if (activeMaster != null) { 887 activeMaster.getMaster().abort("killAll"); 888 } 889 for (RegionServerThread rst : getRegionServerThreads()) { 890 rst.getRegionServer().abort("killAll"); 891 } 892 } 893 894 @Override 895 public void waitUntilShutDown() { 896 this.hbaseCluster.join(); 897 } 898 899 public List<HRegion> findRegionsForTable(TableName tableName) { 900 ArrayList<HRegion> ret = new ArrayList<>(); 901 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 902 HRegionServer hrs = rst.getRegionServer(); 903 for (Region region : hrs.getRegions(tableName)) { 904 if (region.getTableDescriptor().getTableName().equals(tableName)) { 905 ret.add((HRegion) region); 906 } 907 } 908 } 909 return ret; 910 } 911 912 protected int getRegionServerIndex(ServerName serverName) { 913 // we have a small number of region servers, this should be fine for now. 914 List<RegionServerThread> servers = getRegionServerThreads(); 915 for (int i = 0; i < servers.size(); i++) { 916 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 917 return i; 918 } 919 } 920 return -1; 921 } 922 923 protected int getMasterIndex(ServerName serverName) { 924 List<MasterThread> masters = getMasterThreads(); 925 for (int i = 0; i < masters.size(); i++) { 926 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 927 return i; 928 } 929 } 930 return -1; 931 } 932 933 @Override 934 public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException { 935 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices(); 936 } 937 938 @Override 939 public ClientService.BlockingInterface getClientProtocol(ServerName serverName) 940 throws IOException { 941 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices(); 942 } 943}