001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedAction; 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.hbase.client.RegionInfoBuilder; 029import org.apache.hadoop.hbase.client.RegionReplicaUtil; 030import org.apache.hadoop.hbase.master.HMaster; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.Region; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.test.MetricsAssertHelper; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.JVMClusterUtil; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.apache.hadoop.hbase.util.Threads; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.apache.yetus.audience.InterfaceStability; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 048 049/** 050 * This class creates a single process HBase cluster. each server. The master uses the 'default' 051 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem 052 * instance each and will close down their instance on the way out. 053 */ 054@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 055@InterfaceStability.Evolving 056public class SingleProcessHBaseCluster extends HBaseClusterInterface { 057 private static final Logger LOG = 058 LoggerFactory.getLogger(SingleProcessHBaseCluster.class.getName()); 059 public LocalHBaseCluster hbaseCluster; 060 private static int index; 061 062 /** 063 * Start a MiniHBaseCluster. 064 * @param conf Configuration to be used for cluster 065 * @param numRegionServers initial number of region servers to start. 066 */ 067 public SingleProcessHBaseCluster(Configuration conf, int numRegionServers) 068 throws IOException, InterruptedException { 069 this(conf, 1, numRegionServers); 070 } 071 072 /** 073 * Start a MiniHBaseCluster. 074 * @param conf Configuration to be used for cluster 075 * @param numMasters initial number of masters to start. 076 * @param numRegionServers initial number of region servers to start. 077 */ 078 public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers) 079 throws IOException, InterruptedException { 080 this(conf, numMasters, numRegionServers, null, null); 081 } 082 083 /** 084 * Start a MiniHBaseCluster. 085 * @param conf Configuration to be used for cluster 086 * @param numMasters initial number of masters to start. 087 * @param numRegionServers initial number of region servers to start. 088 */ 089 public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 090 Class<? extends HMaster> masterClass, 091 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 092 throws IOException, InterruptedException { 093 this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass); 094 } 095 096 /** 097 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster 098 * restart where for sure the regionservers come up on same address+port (but just 099 * with different startcode); by default mini hbase clusters choose new arbitrary 100 * ports on each cluster start. 101 */ 102 public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters, 103 int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 104 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 105 throws IOException, InterruptedException { 106 super(conf); 107 108 // Hadoop 2 109 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); 110 111 init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass, 112 regionserverClass); 113 this.initialClusterStatus = getClusterMetrics(); 114 } 115 116 public Configuration getConfiguration() { 117 return this.conf; 118 } 119 120 /** 121 * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance 122 * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem 123 * only, not All filesystems as the FileSystem system exit hook does. 124 */ 125 public static class MiniHBaseClusterRegionServer extends HRegionServer { 126 private Thread shutdownThread = null; 127 private User user = null; 128 /** 129 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any 130 * restarted instances of the same server will have different ServerName and will not coincide 131 * with past dead ones. So there's no need to cleanup this list. 132 */ 133 static Set<ServerName> killedServers = new HashSet<>(); 134 135 public MiniHBaseClusterRegionServer(Configuration conf) 136 throws IOException, InterruptedException { 137 super(conf); 138 this.user = User.getCurrent(); 139 } 140 141 @Override 142 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 143 throws IOException { 144 super.handleReportForDutyResponse(c); 145 // Run this thread to shutdown our filesystem on way out. 146 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem()); 147 } 148 149 @Override 150 public void run() { 151 try { 152 this.user.runAs(new PrivilegedAction<Object>() { 153 @Override 154 public Object run() { 155 runRegionServer(); 156 return null; 157 } 158 }); 159 } catch (Throwable t) { 160 LOG.error("Exception in run", t); 161 } finally { 162 // Run this on the way out. 163 if (this.shutdownThread != null) { 164 this.shutdownThread.start(); 165 Threads.shutdown(this.shutdownThread, 30000); 166 } 167 } 168 } 169 170 private void runRegionServer() { 171 super.run(); 172 } 173 174 @Override 175 protected void kill() { 176 killedServers.add(getServerName()); 177 super.kill(); 178 } 179 180 @Override 181 public void abort(final String reason, final Throwable cause) { 182 this.user.runAs(new PrivilegedAction<Object>() { 183 @Override 184 public Object run() { 185 abortRegionServer(reason, cause); 186 return null; 187 } 188 }); 189 } 190 191 private void abortRegionServer(String reason, Throwable cause) { 192 super.abort(reason, cause); 193 } 194 } 195 196 /** 197 * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook 198 * does. 199 */ 200 static class SingleFileSystemShutdownThread extends Thread { 201 private final FileSystem fs; 202 203 SingleFileSystemShutdownThread(final FileSystem fs) { 204 super("Shutdown of " + fs); 205 this.fs = fs; 206 } 207 208 @Override 209 public void run() { 210 try { 211 LOG.info("Hook closing fs=" + this.fs); 212 this.fs.close(); 213 } catch (NullPointerException npe) { 214 LOG.debug("Need to fix these: " + npe.toString()); 215 } catch (IOException e) { 216 LOG.warn("Running hook", e); 217 } 218 } 219 } 220 221 private void init(final int nMasterNodes, final int numAlwaysStandByMasters, 222 final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 223 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 224 throws IOException, InterruptedException { 225 try { 226 if (masterClass == null) { 227 masterClass = HMaster.class; 228 } 229 if (regionserverClass == null) { 230 regionserverClass = SingleProcessHBaseCluster.MiniHBaseClusterRegionServer.class; 231 } 232 233 // start up a LocalHBaseCluster 234 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0, 235 masterClass, regionserverClass); 236 237 // manually add the regionservers as other users 238 for (int i = 0; i < nRegionNodes; i++) { 239 Configuration rsConf = HBaseConfiguration.create(conf); 240 if (rsPorts != null) { 241 rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); 242 } 243 User user = HBaseTestingUtil.getDifferentUser(rsConf, ".hfs." + index++); 244 hbaseCluster.addRegionServer(rsConf, i, user); 245 } 246 247 hbaseCluster.startup(); 248 } catch (IOException e) { 249 shutdown(); 250 throw e; 251 } catch (Throwable t) { 252 LOG.error("Error starting cluster", t); 253 shutdown(); 254 throw new IOException("Shutting down", t); 255 } 256 } 257 258 @Override 259 public void startRegionServer(String hostname, int port) throws IOException { 260 final Configuration newConf = HBaseConfiguration.create(conf); 261 newConf.setInt(HConstants.REGIONSERVER_PORT, port); 262 startRegionServer(newConf); 263 } 264 265 @Override 266 public void killRegionServer(ServerName serverName) throws IOException { 267 HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); 268 if (server instanceof MiniHBaseClusterRegionServer) { 269 LOG.info("Killing " + server.toString()); 270 ((MiniHBaseClusterRegionServer) server).kill(); 271 } else { 272 abortRegionServer(getRegionServerIndex(serverName)); 273 } 274 } 275 276 @Override 277 public boolean isKilledRS(ServerName serverName) { 278 return MiniHBaseClusterRegionServer.killedServers.contains(serverName); 279 } 280 281 @Override 282 public void stopRegionServer(ServerName serverName) throws IOException { 283 stopRegionServer(getRegionServerIndex(serverName)); 284 } 285 286 @Override 287 public void suspendRegionServer(ServerName serverName) throws IOException { 288 suspendRegionServer(getRegionServerIndex(serverName)); 289 } 290 291 @Override 292 public void resumeRegionServer(ServerName serverName) throws IOException { 293 resumeRegionServer(getRegionServerIndex(serverName)); 294 } 295 296 @Override 297 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 298 // ignore timeout for now 299 waitOnRegionServer(getRegionServerIndex(serverName)); 300 } 301 302 @Override 303 public void startZkNode(String hostname, int port) throws IOException { 304 LOG.warn("Starting zookeeper nodes on mini cluster is not supported"); 305 } 306 307 @Override 308 public void killZkNode(ServerName serverName) throws IOException { 309 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported"); 310 } 311 312 @Override 313 public void stopZkNode(ServerName serverName) throws IOException { 314 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported"); 315 } 316 317 @Override 318 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 319 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported"); 320 } 321 322 @Override 323 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 324 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported"); 325 } 326 327 @Override 328 public void startDataNode(ServerName serverName) throws IOException { 329 LOG.warn("Starting datanodes on mini cluster is not supported"); 330 } 331 332 @Override 333 public void killDataNode(ServerName serverName) throws IOException { 334 LOG.warn("Aborting datanodes on mini cluster is not supported"); 335 } 336 337 @Override 338 public void stopDataNode(ServerName serverName) throws IOException { 339 LOG.warn("Stopping datanodes on mini cluster is not supported"); 340 } 341 342 @Override 343 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 344 LOG.warn("Waiting for datanodes to start on mini cluster is not supported"); 345 } 346 347 @Override 348 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 349 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported"); 350 } 351 352 @Override 353 public void startNameNode(ServerName serverName) throws IOException { 354 LOG.warn("Starting namenodes on mini cluster is not supported"); 355 } 356 357 @Override 358 public void killNameNode(ServerName serverName) throws IOException { 359 LOG.warn("Aborting namenodes on mini cluster is not supported"); 360 } 361 362 @Override 363 public void stopNameNode(ServerName serverName) throws IOException { 364 LOG.warn("Stopping namenodes on mini cluster is not supported"); 365 } 366 367 @Override 368 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 369 LOG.warn("Waiting for namenodes to start on mini cluster is not supported"); 370 } 371 372 @Override 373 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 374 LOG.warn("Waiting for namenodes to stop on mini cluster is not supported"); 375 } 376 377 @Override 378 public void startMaster(String hostname, int port) throws IOException { 379 this.startMaster(); 380 } 381 382 @Override 383 public void killMaster(ServerName serverName) throws IOException { 384 abortMaster(getMasterIndex(serverName)); 385 } 386 387 @Override 388 public void stopMaster(ServerName serverName) throws IOException { 389 stopMaster(getMasterIndex(serverName)); 390 } 391 392 @Override 393 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 394 // ignore timeout for now 395 waitOnMaster(getMasterIndex(serverName)); 396 } 397 398 /** 399 * Starts a region server thread running 400 * @return New RegionServerThread 401 */ 402 public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException { 403 final Configuration newConf = HBaseConfiguration.create(conf); 404 return startRegionServer(newConf); 405 } 406 407 private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration) 408 throws IOException { 409 User rsUser = HBaseTestingUtil.getDifferentUser(configuration, ".hfs." + index++); 410 JVMClusterUtil.RegionServerThread t = null; 411 try { 412 t = 413 hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser); 414 t.start(); 415 t.waitForServerOnline(); 416 } catch (InterruptedException ie) { 417 throw new IOException("Interrupted adding regionserver to cluster", ie); 418 } 419 return t; 420 } 421 422 /** 423 * Starts a region server thread and waits until its processed by master. Throws an exception when 424 * it can't start a region server or when the region server is not processed by master within the 425 * timeout. 426 * @return New RegionServerThread 427 */ 428 public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) 429 throws IOException { 430 431 JVMClusterUtil.RegionServerThread t = startRegionServer(); 432 ServerName rsServerName = t.getRegionServer().getServerName(); 433 434 long start = EnvironmentEdgeManager.currentTime(); 435 ClusterMetrics clusterStatus = getClusterMetrics(); 436 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 437 if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) { 438 return t; 439 } 440 Threads.sleep(100); 441 } 442 if (t.getRegionServer().isOnline()) { 443 throw new IOException("RS: " + rsServerName + " online, but not processed by master"); 444 } else { 445 throw new IOException("RS: " + rsServerName + " is offline"); 446 } 447 } 448 449 /** 450 * Cause a region server to exit doing basic clean up only on its way out. 451 * @param serverNumber Used as index into a list. 452 */ 453 public String abortRegionServer(int serverNumber) { 454 HRegionServer server = getRegionServer(serverNumber); 455 LOG.info("Aborting " + server.toString()); 456 server.abort("Aborting for tests", new Exception("Trace info")); 457 return server.toString(); 458 } 459 460 /** 461 * Shut down the specified region server cleanly 462 * @param serverNumber Used as index into a list. 463 * @return the region server that was stopped 464 */ 465 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { 466 return stopRegionServer(serverNumber, true); 467 } 468 469 /** 470 * Shut down the specified region server cleanly 471 * @param serverNumber Used as index into a list. 472 * @param shutdownFS True is we are to shutdown the filesystem as part of this regionserver's 473 * shutdown. Usually we do but you do not want to do this if you are running 474 * multiple regionservers in a test and you shut down one before end of the 475 * test. 476 * @return the region server that was stopped 477 */ 478 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, 479 final boolean shutdownFS) { 480 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 481 LOG.info("Stopping " + server.toString()); 482 server.getRegionServer().stop("Stopping rs " + serverNumber); 483 return server; 484 } 485 486 /** 487 * Suspend the specified region server 488 * @param serverNumber Used as index into a list. 489 */ 490 public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) { 491 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 492 LOG.info("Suspending {}", server.toString()); 493 server.suspend(); 494 return server; 495 } 496 497 /** 498 * Resume the specified region server 499 * @param serverNumber Used as index into a list. 500 */ 501 public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) { 502 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 503 LOG.info("Resuming {}", server.toString()); 504 server.resume(); 505 return server; 506 } 507 508 /** 509 * Wait for the specified region server to stop. Removes this thread from list of running threads. 510 * @return Name of region server that just went down. 511 */ 512 public String waitOnRegionServer(final int serverNumber) { 513 return this.hbaseCluster.waitOnRegionServer(serverNumber); 514 } 515 516 /** 517 * Starts a master thread running 518 * @return New RegionServerThread 519 */ 520 public JVMClusterUtil.MasterThread startMaster() throws IOException { 521 Configuration c = HBaseConfiguration.create(conf); 522 User user = HBaseTestingUtil.getDifferentUser(c, ".hfs." + index++); 523 524 JVMClusterUtil.MasterThread t = null; 525 try { 526 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user); 527 t.start(); 528 } catch (InterruptedException ie) { 529 throw new IOException("Interrupted adding master to cluster", ie); 530 } 531 conf.set(HConstants.MASTER_ADDRS_KEY, 532 hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY)); 533 return t; 534 } 535 536 /** 537 * Returns the current active master, if available. 538 * @return the active HMaster, null if none is active. 539 */ 540 public HMaster getMaster() { 541 return this.hbaseCluster.getActiveMaster(); 542 } 543 544 /** 545 * Returns the current active master thread, if available. 546 * @return the active MasterThread, null if none is active. 547 */ 548 public MasterThread getMasterThread() { 549 for (MasterThread mt : hbaseCluster.getLiveMasters()) { 550 if (mt.getMaster().isActiveMaster()) { 551 return mt; 552 } 553 } 554 return null; 555 } 556 557 /** 558 * Returns the master at the specified index, if available. 559 * @return the active HMaster, null if none is active. 560 */ 561 public HMaster getMaster(final int serverNumber) { 562 return this.hbaseCluster.getMaster(serverNumber); 563 } 564 565 /** 566 * Cause a master to exit without shutting down entire cluster. 567 * @param serverNumber Used as index into a list. 568 */ 569 public String abortMaster(int serverNumber) { 570 HMaster server = getMaster(serverNumber); 571 LOG.info("Aborting " + server.toString()); 572 server.abort("Aborting for tests", new Exception("Trace info")); 573 return server.toString(); 574 } 575 576 /** 577 * Shut down the specified master cleanly 578 * @param serverNumber Used as index into a list. 579 * @return the region server that was stopped 580 */ 581 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { 582 return stopMaster(serverNumber, true); 583 } 584 585 /** 586 * Shut down the specified master cleanly 587 * @param serverNumber Used as index into a list. 588 * @param shutdownFS True is we are to shutdown the filesystem as part of this master's 589 * shutdown. Usually we do but you do not want to do this if you are running 590 * multiple master in a test and you shut down one before end of the test. 591 * @return the master that was stopped 592 */ 593 public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) { 594 JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber); 595 LOG.info("Stopping " + server.toString()); 596 server.getMaster().stop("Stopping master " + serverNumber); 597 return server; 598 } 599 600 /** 601 * Wait for the specified master to stop. Removes this thread from list of running threads. 602 * @return Name of master that just went down. 603 */ 604 public String waitOnMaster(final int serverNumber) { 605 return this.hbaseCluster.waitOnMaster(serverNumber); 606 } 607 608 /** 609 * Blocks until there is an active master and that master has completed initialization. 610 * @return true if an active master becomes available. false if there are no masters left. 611 */ 612 @Override 613 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 614 long start = EnvironmentEdgeManager.currentTime(); 615 while (EnvironmentEdgeManager.currentTime() - start < timeout) { 616 for (JVMClusterUtil.MasterThread mt : getMasterThreads()) { 617 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { 618 return true; 619 } 620 } 621 Threads.sleep(100); 622 } 623 return false; 624 } 625 626 /** 627 * Returns list of master threads. 628 */ 629 public List<JVMClusterUtil.MasterThread> getMasterThreads() { 630 return this.hbaseCluster.getMasters(); 631 } 632 633 /** 634 * Returns list of live master threads (skips the aborted and the killed) 635 */ 636 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() { 637 return this.hbaseCluster.getLiveMasters(); 638 } 639 640 /** 641 * Wait for Mini HBase Cluster to shut down. 642 */ 643 public void join() { 644 this.hbaseCluster.join(); 645 } 646 647 /** 648 * Shut down the mini HBase cluster 649 */ 650 @Override 651 public void shutdown() throws IOException { 652 if (this.hbaseCluster != null) { 653 this.hbaseCluster.shutdown(); 654 } 655 } 656 657 @Override 658 public void close() throws IOException { 659 } 660 661 @Override 662 public ClusterMetrics getClusterMetrics() throws IOException { 663 HMaster master = getMaster(); 664 return master == null ? null : master.getClusterMetrics(); 665 } 666 667 private void executeFlush(HRegion region) throws IOException { 668 if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 669 return; 670 } 671 // retry 5 times if we can not flush 672 for (int i = 0; i < 5; i++) { 673 FlushResult result = region.flush(true); 674 if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) { 675 return; 676 } 677 Threads.sleep(1000); 678 } 679 } 680 681 /** 682 * Call flushCache on all regions on all participating regionservers. 683 */ 684 public void flushcache() throws IOException { 685 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 686 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 687 executeFlush(r); 688 } 689 } 690 } 691 692 /** 693 * Call flushCache on all regions of the specified table. 694 */ 695 public void flushcache(TableName tableName) throws IOException { 696 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 697 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 698 if (r.getTableDescriptor().getTableName().equals(tableName)) { 699 executeFlush(r); 700 } 701 } 702 } 703 } 704 705 /** 706 * Call flushCache on all regions on all participating regionservers. 707 */ 708 public void compact(boolean major) throws IOException { 709 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 710 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 711 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 712 r.compact(major); 713 } 714 } 715 } 716 } 717 718 /** 719 * Call flushCache on all regions of the specified table. 720 */ 721 public void compact(TableName tableName, boolean major) throws IOException { 722 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 723 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 724 if (r.getTableDescriptor().getTableName().equals(tableName)) { 725 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 726 r.compact(major); 727 } 728 } 729 } 730 } 731 } 732 733 /** 734 * Returns number of live region servers in the cluster currently. 735 */ 736 public int getNumLiveRegionServers() { 737 return this.hbaseCluster.getLiveRegionServers().size(); 738 } 739 740 /** 741 * Returns list of region server threads. Does not return the master even though it is also a 742 * region server. 743 */ 744 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { 745 return this.hbaseCluster.getRegionServers(); 746 } 747 748 /** 749 * Returns List of live region server threads (skips the aborted and the killed) 750 */ 751 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { 752 return this.hbaseCluster.getLiveRegionServers(); 753 } 754 755 /** 756 * Grab a numbered region server of your choice. 757 * @return region server 758 */ 759 public HRegionServer getRegionServer(int serverNumber) { 760 return hbaseCluster.getRegionServer(serverNumber); 761 } 762 763 public HRegionServer getRegionServer(ServerName serverName) { 764 return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer()) 765 .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null); 766 } 767 768 public List<HRegion> getRegions(byte[] tableName) { 769 return getRegions(TableName.valueOf(tableName)); 770 } 771 772 public List<HRegion> getRegions(TableName tableName) { 773 List<HRegion> ret = new ArrayList<>(); 774 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 775 HRegionServer hrs = rst.getRegionServer(); 776 for (Region region : hrs.getOnlineRegionsLocalContext()) { 777 if (region.getTableDescriptor().getTableName().equals(tableName)) { 778 ret.add((HRegion) region); 779 } 780 } 781 } 782 return ret; 783 } 784 785 /** 786 * Returns index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS 787 * carrying regionName. Returns -1 if none found. 788 */ 789 public int getServerWithMeta() { 790 return getServerWith(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()); 791 } 792 793 /** 794 * Get the location of the specified region 795 * @param regionName Name of the region in bytes 796 * @return Index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS 797 * carrying hbase:meta. Returns -1 if none found. 798 */ 799 public int getServerWith(byte[] regionName) { 800 int index = 0; 801 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 802 HRegionServer hrs = rst.getRegionServer(); 803 if (!hrs.isStopped()) { 804 Region region = hrs.getOnlineRegion(regionName); 805 if (region != null) { 806 return index; 807 } 808 } 809 index++; 810 } 811 return -1; 812 } 813 814 @Override 815 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) 816 throws IOException { 817 int index = getServerWith(regionName); 818 if (index < 0) { 819 return null; 820 } 821 return getRegionServer(index).getServerName(); 822 } 823 824 /** 825 * Counts the total numbers of regions being served by the currently online region servers by 826 * asking each how many regions they have. Does not look at hbase:meta at all. Count includes 827 * catalog tables. 828 * @return number of regions being served by all region servers 829 */ 830 public long countServedRegions() { 831 long count = 0; 832 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) { 833 count += rst.getRegionServer().getNumberOfOnlineRegions(); 834 } 835 return count; 836 } 837 838 /** 839 * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the 840 * mini-cluster back for clean shutdown. 841 */ 842 public void killAll() { 843 // Do backups first. 844 MasterThread activeMaster = null; 845 for (MasterThread masterThread : getMasterThreads()) { 846 if (!masterThread.getMaster().isActiveMaster()) { 847 masterThread.getMaster().abort("killAll"); 848 } else { 849 activeMaster = masterThread; 850 } 851 } 852 // Do active after. 853 if (activeMaster != null) { 854 activeMaster.getMaster().abort("killAll"); 855 } 856 for (RegionServerThread rst : getRegionServerThreads()) { 857 rst.getRegionServer().abort("killAll"); 858 } 859 } 860 861 @Override 862 public void waitUntilShutDown() { 863 this.hbaseCluster.join(); 864 } 865 866 public List<HRegion> findRegionsForTable(TableName tableName) { 867 ArrayList<HRegion> ret = new ArrayList<>(); 868 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 869 HRegionServer hrs = rst.getRegionServer(); 870 for (Region region : hrs.getRegions(tableName)) { 871 if (region.getTableDescriptor().getTableName().equals(tableName)) { 872 ret.add((HRegion) region); 873 } 874 } 875 } 876 return ret; 877 } 878 879 protected int getRegionServerIndex(ServerName serverName) { 880 // we have a small number of region servers, this should be fine for now. 881 List<RegionServerThread> servers = getRegionServerThreads(); 882 for (int i = 0; i < servers.size(); i++) { 883 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 884 return i; 885 } 886 } 887 return -1; 888 } 889 890 protected int getMasterIndex(ServerName serverName) { 891 List<MasterThread> masters = getMasterThreads(); 892 for (int i = 0; i < masters.size(); i++) { 893 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 894 return i; 895 } 896 } 897 return -1; 898 } 899}