001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedAction; 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.client.RegionInfoBuilder; 028import org.apache.hadoop.hbase.client.RegionReplicaUtil; 029import org.apache.hadoop.hbase.master.HMaster; 030import org.apache.hadoop.hbase.regionserver.HRegion; 031import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 032import org.apache.hadoop.hbase.regionserver.HRegionServer; 033import org.apache.hadoop.hbase.regionserver.Region; 034import org.apache.hadoop.hbase.security.User; 035import org.apache.hadoop.hbase.test.MetricsAssertHelper; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.JVMClusterUtil; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 040import org.apache.hadoop.hbase.util.Threads; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * This class creates a single process HBase cluster. each server. The master uses the 'default' 048 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem 049 * instance each and will close down their instance on the way out. 050 */ 051@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 052@InterfaceStability.Evolving 053public class SingleProcessHBaseCluster extends HBaseClusterInterface { 054 private static final Logger LOG = 055 LoggerFactory.getLogger(SingleProcessHBaseCluster.class.getName()); 056 public LocalHBaseCluster hbaseCluster; 057 private static int index; 058 059 /** 060 * Start a MiniHBaseCluster. 061 * @param conf Configuration to be used for cluster 062 * @param numRegionServers initial number of region servers to start. 063 */ 064 public SingleProcessHBaseCluster(Configuration conf, int numRegionServers) 065 throws IOException, InterruptedException { 066 this(conf, 1, numRegionServers); 067 } 068 069 /** 070 * Start a MiniHBaseCluster. 071 * @param conf Configuration to be used for cluster 072 * @param numMasters initial number of masters to start. 073 * @param numRegionServers initial number of region servers to start. 074 */ 075 public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers) 076 throws IOException, InterruptedException { 077 this(conf, numMasters, numRegionServers, null, null); 078 } 079 080 /** 081 * Start a MiniHBaseCluster. 082 * @param conf Configuration to be used for cluster 083 * @param numMasters initial number of masters to start. 084 * @param numRegionServers initial number of region servers to start. 085 */ 086 public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 087 Class<? extends HMaster> masterClass, 088 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 089 throws IOException, InterruptedException { 090 this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass); 091 } 092 093 /** 094 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster 095 * restart where for sure the regionservers come up on same address+port (but just 096 * with different startcode); by default mini hbase clusters choose new arbitrary 097 * ports on each cluster start. 098 */ 099 public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters, 100 int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 101 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 102 throws IOException, InterruptedException { 103 super(conf); 104 105 // Hadoop 2 106 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); 107 108 init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass, 109 regionserverClass); 110 this.initialClusterStatus = getClusterMetrics(); 111 } 112 113 public Configuration getConfiguration() { 114 return this.conf; 115 } 116 117 /** 118 * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance 119 * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem 120 * only, not All filesystems as the FileSystem system exit hook does. 121 */ 122 public static class MiniHBaseClusterRegionServer extends HRegionServer { 123 124 private User user = null; 125 /** 126 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any 127 * restarted instances of the same server will have different ServerName and will not coincide 128 * with past dead ones. So there's no need to cleanup this list. 129 */ 130 static Set<ServerName> killedServers = new HashSet<>(); 131 132 public MiniHBaseClusterRegionServer(Configuration conf) 133 throws IOException, InterruptedException { 134 super(conf); 135 this.user = User.getCurrent(); 136 } 137 138 @Override 139 public void run() { 140 try { 141 this.user.runAs(new PrivilegedAction<Object>() { 142 @Override 143 public Object run() { 144 runRegionServer(); 145 return null; 146 } 147 }); 148 } catch (Throwable t) { 149 LOG.error("Exception in run", t); 150 } 151 } 152 153 private void runRegionServer() { 154 super.run(); 155 } 156 157 @Override 158 protected void kill() { 159 killedServers.add(getServerName()); 160 super.kill(); 161 } 162 163 @Override 164 public void abort(final String reason, final Throwable cause) { 165 this.user.runAs(new PrivilegedAction<Object>() { 166 @Override 167 public Object run() { 168 abortRegionServer(reason, cause); 169 return null; 170 } 171 }); 172 } 173 174 private void abortRegionServer(String reason, Throwable cause) { 175 super.abort(reason, cause); 176 } 177 } 178 179 private void init(final int nMasterNodes, final int numAlwaysStandByMasters, 180 final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 181 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 182 throws IOException, InterruptedException { 183 try { 184 if (masterClass == null) { 185 masterClass = HMaster.class; 186 } 187 if (regionserverClass == null) { 188 regionserverClass = SingleProcessHBaseCluster.MiniHBaseClusterRegionServer.class; 189 } 190 191 // start up a LocalHBaseCluster 192 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0, 193 masterClass, regionserverClass); 194 195 // manually add the regionservers as other users 196 for (int i = 0; i < nRegionNodes; i++) { 197 Configuration rsConf = HBaseConfiguration.create(conf); 198 if (rsPorts != null) { 199 rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); 200 } 201 User user = HBaseTestingUtil.getDifferentUser(rsConf, ".hfs." + index++); 202 hbaseCluster.addRegionServer(rsConf, i, user); 203 } 204 205 hbaseCluster.startup(); 206 } catch (IOException e) { 207 shutdown(); 208 throw e; 209 } catch (Throwable t) { 210 LOG.error("Error starting cluster", t); 211 shutdown(); 212 throw new IOException("Shutting down", t); 213 } 214 } 215 216 @Override 217 public void startRegionServer(String hostname, int port) throws IOException { 218 final Configuration newConf = HBaseConfiguration.create(conf); 219 newConf.setInt(HConstants.REGIONSERVER_PORT, port); 220 startRegionServer(newConf); 221 } 222 223 @Override 224 public void killRegionServer(ServerName serverName) throws IOException { 225 HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); 226 if (server instanceof MiniHBaseClusterRegionServer) { 227 LOG.info("Killing " + server.toString()); 228 ((MiniHBaseClusterRegionServer) server).kill(); 229 } else { 230 abortRegionServer(getRegionServerIndex(serverName)); 231 } 232 } 233 234 @Override 235 public boolean isKilledRS(ServerName serverName) { 236 return MiniHBaseClusterRegionServer.killedServers.contains(serverName); 237 } 238 239 @Override 240 public void stopRegionServer(ServerName serverName) throws IOException { 241 stopRegionServer(getRegionServerIndex(serverName)); 242 } 243 244 @Override 245 public void suspendRegionServer(ServerName serverName) throws IOException { 246 suspendRegionServer(getRegionServerIndex(serverName)); 247 } 248 249 @Override 250 public void resumeRegionServer(ServerName serverName) throws IOException { 251 resumeRegionServer(getRegionServerIndex(serverName)); 252 } 253 254 @Override 255 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 256 // ignore timeout for now 257 waitOnRegionServer(getRegionServerIndex(serverName)); 258 } 259 260 @Override 261 public void waitForRegionServerToSuspend(ServerName serverName, long timeout) throws IOException { 262 LOG.warn("Waiting for regionserver to suspend on mini cluster is not supported"); 263 } 264 265 @Override 266 public void waitForRegionServerToResume(ServerName serverName, long timeout) throws IOException { 267 LOG.warn("Waiting for regionserver to resume on mini cluster is not supported"); 268 } 269 270 @Override 271 public void startZkNode(String hostname, int port) throws IOException { 272 LOG.warn("Starting zookeeper nodes on mini cluster is not supported"); 273 } 274 275 @Override 276 public void killZkNode(ServerName serverName) throws IOException { 277 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported"); 278 } 279 280 @Override 281 public void stopZkNode(ServerName serverName) throws IOException { 282 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported"); 283 } 284 285 @Override 286 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 287 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported"); 288 } 289 290 @Override 291 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 292 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported"); 293 } 294 295 @Override 296 public void startDataNode(ServerName serverName) throws IOException { 297 LOG.warn("Starting datanodes on mini cluster is not supported"); 298 } 299 300 @Override 301 public void killDataNode(ServerName serverName) throws IOException { 302 LOG.warn("Aborting datanodes on mini cluster is not supported"); 303 } 304 305 @Override 306 public void stopDataNode(ServerName serverName) throws IOException { 307 LOG.warn("Stopping datanodes on mini cluster is not supported"); 308 } 309 310 @Override 311 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 312 LOG.warn("Waiting for datanodes to start on mini cluster is not supported"); 313 } 314 315 @Override 316 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 317 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported"); 318 } 319 320 @Override 321 public void startNameNode(ServerName serverName) throws IOException { 322 LOG.warn("Starting namenodes on mini cluster is not supported"); 323 } 324 325 @Override 326 public void killNameNode(ServerName serverName) throws IOException { 327 LOG.warn("Aborting namenodes on mini cluster is not supported"); 328 } 329 330 @Override 331 public void stopNameNode(ServerName serverName) throws IOException { 332 LOG.warn("Stopping namenodes on mini cluster is not supported"); 333 } 334 335 @Override 336 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 337 LOG.warn("Waiting for namenodes to start on mini cluster is not supported"); 338 } 339 340 @Override 341 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 342 LOG.warn("Waiting for namenodes to stop on mini cluster is not supported"); 343 } 344 345 @Override 346 public void startJournalNode(ServerName serverName) { 347 LOG.warn("Starting journalnodes on mini cluster is not supported"); 348 } 349 350 @Override 351 public void killJournalNode(ServerName serverName) { 352 LOG.warn("Aborting journalnodes on mini cluster is not supported"); 353 } 354 355 @Override 356 public void stopJournalNode(ServerName serverName) { 357 LOG.warn("Stopping journalnodes on mini cluster is not supported"); 358 } 359 360 @Override 361 public void waitForJournalNodeToStart(ServerName serverName, long timeout) { 362 LOG.warn("Waiting for journalnodes to start on mini cluster is not supported"); 363 } 364 365 @Override 366 public void waitForJournalNodeToStop(ServerName serverName, long timeout) { 367 LOG.warn("Waiting for journalnodes to stop on mini cluster is not supported"); 368 } 369 370 @Override 371 public void startMaster(String hostname, int port) throws IOException { 372 this.startMaster(); 373 } 374 375 @Override 376 public void killMaster(ServerName serverName) throws IOException { 377 abortMaster(getMasterIndex(serverName)); 378 } 379 380 @Override 381 public void stopMaster(ServerName serverName) throws IOException { 382 stopMaster(getMasterIndex(serverName)); 383 } 384 385 @Override 386 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 387 // ignore timeout for now 388 waitOnMaster(getMasterIndex(serverName)); 389 } 390 391 /** 392 * Starts a region server thread running 393 * @return New RegionServerThread 394 */ 395 public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException { 396 final Configuration newConf = HBaseConfiguration.create(conf); 397 return startRegionServer(newConf); 398 } 399 400 private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration) 401 throws IOException { 402 User rsUser = HBaseTestingUtil.getDifferentUser(configuration, ".hfs." + index++); 403 JVMClusterUtil.RegionServerThread t = null; 404 try { 405 t = 406 hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser); 407 t.start(); 408 t.waitForServerOnline(); 409 } catch (InterruptedException ie) { 410 throw new IOException("Interrupted adding regionserver to cluster", ie); 411 } 412 return t; 413 } 414 415 /** 416 * Starts a region server thread and waits until its processed by master. Throws an exception when 417 * it can't start a region server or when the region server is not processed by master within the 418 * timeout. 419 * @return New RegionServerThread 420 */ 421 public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) 422 throws IOException { 423 424 JVMClusterUtil.RegionServerThread t = startRegionServer(); 425 ServerName rsServerName = t.getRegionServer().getServerName(); 426 427 long start = EnvironmentEdgeManager.currentTime(); 428 ClusterMetrics clusterStatus = getClusterMetrics(); 429 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 430 if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) { 431 return t; 432 } 433 Threads.sleep(100); 434 } 435 if (t.getRegionServer().isOnline()) { 436 throw new IOException("RS: " + rsServerName + " online, but not processed by master"); 437 } else { 438 throw new IOException("RS: " + rsServerName + " is offline"); 439 } 440 } 441 442 /** 443 * Cause a region server to exit doing basic clean up only on its way out. 444 * @param serverNumber Used as index into a list. 445 */ 446 public String abortRegionServer(int serverNumber) { 447 HRegionServer server = getRegionServer(serverNumber); 448 LOG.info("Aborting " + server.toString()); 449 server.abort("Aborting for tests", new Exception("Trace info")); 450 return server.toString(); 451 } 452 453 /** 454 * Shut down the specified region server cleanly 455 * @param serverNumber Used as index into a list. 456 * @return the region server that was stopped 457 */ 458 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { 459 return stopRegionServer(serverNumber, true); 460 } 461 462 /** 463 * Shut down the specified region server cleanly 464 * @param serverNumber Used as index into a list. 465 * @param shutdownFS True is we are to shutdown the filesystem as part of this regionserver's 466 * shutdown. Usually we do but you do not want to do this if you are running 467 * multiple regionservers in a test and you shut down one before end of the 468 * test. 469 * @return the region server that was stopped 470 */ 471 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, 472 final boolean shutdownFS) { 473 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 474 LOG.info("Stopping " + server.toString()); 475 server.getRegionServer().stop("Stopping rs " + serverNumber); 476 return server; 477 } 478 479 /** 480 * Suspend the specified region server 481 * @param serverNumber Used as index into a list. 482 */ 483 public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) { 484 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 485 LOG.info("Suspending {}", server.toString()); 486 server.suspend(); 487 return server; 488 } 489 490 /** 491 * Resume the specified region server 492 * @param serverNumber Used as index into a list. 493 */ 494 public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) { 495 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 496 LOG.info("Resuming {}", server.toString()); 497 server.resume(); 498 return server; 499 } 500 501 /** 502 * Wait for the specified region server to stop. Removes this thread from list of running threads. 503 * @return Name of region server that just went down. 504 */ 505 public String waitOnRegionServer(final int serverNumber) { 506 return this.hbaseCluster.waitOnRegionServer(serverNumber); 507 } 508 509 /** 510 * Starts a master thread running 511 * @return New RegionServerThread 512 */ 513 public JVMClusterUtil.MasterThread startMaster() throws IOException { 514 Configuration c = HBaseConfiguration.create(conf); 515 User user = HBaseTestingUtil.getDifferentUser(c, ".hfs." + index++); 516 517 JVMClusterUtil.MasterThread t = null; 518 try { 519 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user); 520 t.start(); 521 } catch (InterruptedException ie) { 522 throw new IOException("Interrupted adding master to cluster", ie); 523 } 524 conf.set(HConstants.MASTER_ADDRS_KEY, 525 hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY)); 526 return t; 527 } 528 529 /** 530 * Returns the current active master, if available. 531 * @return the active HMaster, null if none is active. 532 */ 533 public HMaster getMaster() { 534 return this.hbaseCluster.getActiveMaster(); 535 } 536 537 /** 538 * Returns the current active master thread, if available. 539 * @return the active MasterThread, null if none is active. 540 */ 541 public MasterThread getMasterThread() { 542 for (MasterThread mt : hbaseCluster.getLiveMasters()) { 543 if (mt.getMaster().isActiveMaster()) { 544 return mt; 545 } 546 } 547 return null; 548 } 549 550 /** 551 * Returns the master at the specified index, if available. 552 * @return the active HMaster, null if none is active. 553 */ 554 public HMaster getMaster(final int serverNumber) { 555 return this.hbaseCluster.getMaster(serverNumber); 556 } 557 558 /** 559 * Cause a master to exit without shutting down entire cluster. 560 * @param serverNumber Used as index into a list. 561 */ 562 public String abortMaster(int serverNumber) { 563 HMaster server = getMaster(serverNumber); 564 LOG.info("Aborting " + server.toString()); 565 server.abort("Aborting for tests", new Exception("Trace info")); 566 return server.toString(); 567 } 568 569 /** 570 * Shut down the specified master cleanly 571 * @param serverNumber Used as index into a list. 572 * @return the region server that was stopped 573 */ 574 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { 575 return stopMaster(serverNumber, true); 576 } 577 578 /** 579 * Shut down the specified master cleanly 580 * @param serverNumber Used as index into a list. 581 * @param shutdownFS True is we are to shutdown the filesystem as part of this master's 582 * shutdown. Usually we do but you do not want to do this if you are running 583 * multiple master in a test and you shut down one before end of the test. 584 * @return the master that was stopped 585 */ 586 public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) { 587 JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber); 588 LOG.info("Stopping " + server.toString()); 589 server.getMaster().stop("Stopping master " + serverNumber); 590 return server; 591 } 592 593 /** 594 * Wait for the specified master to stop. Removes this thread from list of running threads. 595 * @return Name of master that just went down. 596 */ 597 public String waitOnMaster(final int serverNumber) { 598 return this.hbaseCluster.waitOnMaster(serverNumber); 599 } 600 601 /** 602 * Blocks until there is an active master and that master has completed initialization. 603 * @return true if an active master becomes available. false if there are no masters left. 604 */ 605 @Override 606 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 607 long start = EnvironmentEdgeManager.currentTime(); 608 while (EnvironmentEdgeManager.currentTime() - start < timeout) { 609 for (JVMClusterUtil.MasterThread mt : getMasterThreads()) { 610 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { 611 return true; 612 } 613 } 614 Threads.sleep(100); 615 } 616 return false; 617 } 618 619 /** 620 * Returns list of master threads. 621 */ 622 public List<JVMClusterUtil.MasterThread> getMasterThreads() { 623 return this.hbaseCluster.getMasters(); 624 } 625 626 /** 627 * Returns list of live master threads (skips the aborted and the killed) 628 */ 629 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() { 630 return this.hbaseCluster.getLiveMasters(); 631 } 632 633 /** 634 * Wait for Mini HBase Cluster to shut down. 635 */ 636 public void join() { 637 this.hbaseCluster.join(); 638 } 639 640 /** 641 * Shut down the mini HBase cluster 642 */ 643 @Override 644 public void shutdown() throws IOException { 645 if (this.hbaseCluster != null) { 646 this.hbaseCluster.shutdown(); 647 } 648 } 649 650 @Override 651 public void close() throws IOException { 652 } 653 654 @Override 655 public ClusterMetrics getClusterMetrics() throws IOException { 656 HMaster master = getMaster(); 657 return master == null ? null : master.getClusterMetrics(); 658 } 659 660 private void executeFlush(HRegion region) throws IOException { 661 if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 662 return; 663 } 664 // retry 5 times if we can not flush 665 for (int i = 0; i < 5; i++) { 666 FlushResult result = region.flush(true); 667 if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) { 668 return; 669 } 670 Threads.sleep(1000); 671 } 672 } 673 674 /** 675 * Call flushCache on all regions on all participating regionservers. 676 */ 677 public void flushcache() throws IOException { 678 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 679 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 680 executeFlush(r); 681 } 682 } 683 } 684 685 /** 686 * Call flushCache on all regions of the specified table. 687 */ 688 public void flushcache(TableName tableName) throws IOException { 689 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 690 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 691 if (r.getTableDescriptor().getTableName().equals(tableName)) { 692 executeFlush(r); 693 } 694 } 695 } 696 } 697 698 /** 699 * Call flushCache on all regions on all participating regionservers. 700 */ 701 public void compact(boolean major) throws IOException { 702 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 703 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 704 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 705 r.compact(major); 706 } 707 } 708 } 709 } 710 711 /** 712 * Call flushCache on all regions of the specified table. 713 */ 714 public void compact(TableName tableName, boolean major) throws IOException { 715 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 716 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 717 if (r.getTableDescriptor().getTableName().equals(tableName)) { 718 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 719 r.compact(major); 720 } 721 } 722 } 723 } 724 } 725 726 /** 727 * Returns number of live region servers in the cluster currently. 728 */ 729 public int getNumLiveRegionServers() { 730 return this.hbaseCluster.getLiveRegionServers().size(); 731 } 732 733 /** 734 * Returns list of region server threads. Does not return the master even though it is also a 735 * region server. 736 */ 737 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { 738 return this.hbaseCluster.getRegionServers(); 739 } 740 741 /** 742 * Returns List of live region server threads (skips the aborted and the killed) 743 */ 744 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { 745 return this.hbaseCluster.getLiveRegionServers(); 746 } 747 748 /** 749 * Grab a numbered region server of your choice. 750 * @return region server 751 */ 752 public HRegionServer getRegionServer(int serverNumber) { 753 return hbaseCluster.getRegionServer(serverNumber); 754 } 755 756 public HRegionServer getRegionServer(ServerName serverName) { 757 return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer()) 758 .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null); 759 } 760 761 public List<HRegion> getRegions(byte[] tableName) { 762 return getRegions(TableName.valueOf(tableName)); 763 } 764 765 public List<HRegion> getRegions(TableName tableName) { 766 List<HRegion> ret = new ArrayList<>(); 767 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 768 HRegionServer hrs = rst.getRegionServer(); 769 for (Region region : hrs.getOnlineRegionsLocalContext()) { 770 if (region.getTableDescriptor().getTableName().equals(tableName)) { 771 ret.add((HRegion) region); 772 } 773 } 774 } 775 return ret; 776 } 777 778 /** 779 * Returns index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS 780 * carrying regionName. Returns -1 if none found. 781 */ 782 public int getServerWithMeta() { 783 return getServerWith(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()); 784 } 785 786 /** 787 * Get the location of the specified region 788 * @param regionName Name of the region in bytes 789 * @return Index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS 790 * carrying hbase:meta. Returns -1 if none found. 791 */ 792 public int getServerWith(byte[] regionName) { 793 int index = 0; 794 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 795 HRegionServer hrs = rst.getRegionServer(); 796 if (!hrs.isStopped()) { 797 Region region = hrs.getOnlineRegion(regionName); 798 if (region != null) { 799 return index; 800 } 801 } 802 index++; 803 } 804 return -1; 805 } 806 807 @Override 808 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) 809 throws IOException { 810 int index = getServerWith(regionName); 811 if (index < 0) { 812 return null; 813 } 814 return getRegionServer(index).getServerName(); 815 } 816 817 /** 818 * Counts the total numbers of regions being served by the currently online region servers by 819 * asking each how many regions they have. Does not look at hbase:meta at all. Count includes 820 * catalog tables. 821 * @return number of regions being served by all region servers 822 */ 823 public long countServedRegions() { 824 long count = 0; 825 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) { 826 count += rst.getRegionServer().getNumberOfOnlineRegions(); 827 } 828 return count; 829 } 830 831 /** 832 * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the 833 * mini-cluster back for clean shutdown. 834 */ 835 public void killAll() { 836 // Do backups first. 837 MasterThread activeMaster = null; 838 for (MasterThread masterThread : getMasterThreads()) { 839 if (!masterThread.getMaster().isActiveMaster()) { 840 masterThread.getMaster().abort("killAll"); 841 } else { 842 activeMaster = masterThread; 843 } 844 } 845 // Do active after. 846 if (activeMaster != null) { 847 activeMaster.getMaster().abort("killAll"); 848 } 849 for (RegionServerThread rst : getRegionServerThreads()) { 850 rst.getRegionServer().abort("killAll"); 851 } 852 } 853 854 @Override 855 public void waitUntilShutDown() { 856 this.hbaseCluster.join(); 857 } 858 859 public List<HRegion> findRegionsForTable(TableName tableName) { 860 ArrayList<HRegion> ret = new ArrayList<>(); 861 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 862 HRegionServer hrs = rst.getRegionServer(); 863 for (Region region : hrs.getRegions(tableName)) { 864 if (region.getTableDescriptor().getTableName().equals(tableName)) { 865 ret.add((HRegion) region); 866 } 867 } 868 } 869 return ret; 870 } 871 872 protected int getRegionServerIndex(ServerName serverName) { 873 // we have a small number of region servers, this should be fine for now. 874 List<RegionServerThread> servers = getRegionServerThreads(); 875 for (int i = 0; i < servers.size(); i++) { 876 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 877 return i; 878 } 879 } 880 return -1; 881 } 882 883 protected int getMasterIndex(ServerName serverName) { 884 List<MasterThread> masters = getMasterThreads(); 885 for (int i = 0; i < masters.size(); i++) { 886 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 887 return i; 888 } 889 } 890 return -1; 891 } 892}