001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedAction;
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.client.RegionInfoBuilder;
028import org.apache.hadoop.hbase.client.RegionReplicaUtil;
029import org.apache.hadoop.hbase.master.HMaster;
030import org.apache.hadoop.hbase.regionserver.HRegion;
031import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
032import org.apache.hadoop.hbase.regionserver.HRegionServer;
033import org.apache.hadoop.hbase.regionserver.Region;
034import org.apache.hadoop.hbase.security.User;
035import org.apache.hadoop.hbase.test.MetricsAssertHelper;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.JVMClusterUtil;
038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
040import org.apache.hadoop.hbase.util.Threads;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.yetus.audience.InterfaceStability;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * This class creates a single process HBase cluster. each server. The master uses the 'default'
048 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem
049 * instance each and will close down their instance on the way out.
050 */
051@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
052@InterfaceStability.Evolving
053public class SingleProcessHBaseCluster extends HBaseClusterInterface {
054  private static final Logger LOG =
055    LoggerFactory.getLogger(SingleProcessHBaseCluster.class.getName());
056  public LocalHBaseCluster hbaseCluster;
057  private static int index;
058
059  /**
060   * Start a MiniHBaseCluster.
061   * @param conf             Configuration to be used for cluster
062   * @param numRegionServers initial number of region servers to start.
063   */
064  public SingleProcessHBaseCluster(Configuration conf, int numRegionServers)
065    throws IOException, InterruptedException {
066    this(conf, 1, numRegionServers);
067  }
068
069  /**
070   * Start a MiniHBaseCluster.
071   * @param conf             Configuration to be used for cluster
072   * @param numMasters       initial number of masters to start.
073   * @param numRegionServers initial number of region servers to start.
074   */
075  public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
076    throws IOException, InterruptedException {
077    this(conf, numMasters, numRegionServers, null, null);
078  }
079
080  /**
081   * Start a MiniHBaseCluster.
082   * @param conf             Configuration to be used for cluster
083   * @param numMasters       initial number of masters to start.
084   * @param numRegionServers initial number of region servers to start.
085   */
086  public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
087    Class<? extends HMaster> masterClass,
088    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
089    throws IOException, InterruptedException {
090    this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
091  }
092
093  /**
094   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
095   *                restart where for sure the regionservers come up on same address+port (but just
096   *                with different startcode); by default mini hbase clusters choose new arbitrary
097   *                ports on each cluster start.
098   */
099  public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
100    int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
101    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
102    throws IOException, InterruptedException {
103    super(conf);
104
105    // Hadoop 2
106    CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
107
108    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
109      regionserverClass);
110    this.initialClusterStatus = getClusterMetrics();
111  }
112
113  public Configuration getConfiguration() {
114    return this.conf;
115  }
116
117  /**
118   * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance
119   * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem
120   * only, not All filesystems as the FileSystem system exit hook does.
121   */
122  public static class MiniHBaseClusterRegionServer extends HRegionServer {
123
124    private User user = null;
125    /**
126     * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
127     * restarted instances of the same server will have different ServerName and will not coincide
128     * with past dead ones. So there's no need to cleanup this list.
129     */
130    static Set<ServerName> killedServers = new HashSet<>();
131
132    public MiniHBaseClusterRegionServer(Configuration conf)
133      throws IOException, InterruptedException {
134      super(conf);
135      this.user = User.getCurrent();
136    }
137
138    @Override
139    public void run() {
140      try {
141        this.user.runAs(new PrivilegedAction<Object>() {
142          @Override
143          public Object run() {
144            runRegionServer();
145            return null;
146          }
147        });
148      } catch (Throwable t) {
149        LOG.error("Exception in run", t);
150      }
151    }
152
153    private void runRegionServer() {
154      super.run();
155    }
156
157    @Override
158    protected void kill() {
159      killedServers.add(getServerName());
160      super.kill();
161    }
162
163    @Override
164    public void abort(final String reason, final Throwable cause) {
165      this.user.runAs(new PrivilegedAction<Object>() {
166        @Override
167        public Object run() {
168          abortRegionServer(reason, cause);
169          return null;
170        }
171      });
172    }
173
174    private void abortRegionServer(String reason, Throwable cause) {
175      super.abort(reason, cause);
176    }
177  }
178
179  private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
180    final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
181    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
182    throws IOException, InterruptedException {
183    try {
184      if (masterClass == null) {
185        masterClass = HMaster.class;
186      }
187      if (regionserverClass == null) {
188        regionserverClass = SingleProcessHBaseCluster.MiniHBaseClusterRegionServer.class;
189      }
190
191      // start up a LocalHBaseCluster
192      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0,
193        masterClass, regionserverClass);
194
195      // manually add the regionservers as other users
196      for (int i = 0; i < nRegionNodes; i++) {
197        Configuration rsConf = HBaseConfiguration.create(conf);
198        if (rsPorts != null) {
199          rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
200        }
201        User user = HBaseTestingUtil.getDifferentUser(rsConf, ".hfs." + index++);
202        hbaseCluster.addRegionServer(rsConf, i, user);
203      }
204
205      hbaseCluster.startup();
206    } catch (IOException e) {
207      shutdown();
208      throw e;
209    } catch (Throwable t) {
210      LOG.error("Error starting cluster", t);
211      shutdown();
212      throw new IOException("Shutting down", t);
213    }
214  }
215
216  @Override
217  public void startRegionServer(String hostname, int port) throws IOException {
218    final Configuration newConf = HBaseConfiguration.create(conf);
219    newConf.setInt(HConstants.REGIONSERVER_PORT, port);
220    startRegionServer(newConf);
221  }
222
223  @Override
224  public void killRegionServer(ServerName serverName) throws IOException {
225    HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
226    if (server instanceof MiniHBaseClusterRegionServer) {
227      LOG.info("Killing " + server.toString());
228      ((MiniHBaseClusterRegionServer) server).kill();
229    } else {
230      abortRegionServer(getRegionServerIndex(serverName));
231    }
232  }
233
234  @Override
235  public boolean isKilledRS(ServerName serverName) {
236    return MiniHBaseClusterRegionServer.killedServers.contains(serverName);
237  }
238
239  @Override
240  public void stopRegionServer(ServerName serverName) throws IOException {
241    stopRegionServer(getRegionServerIndex(serverName));
242  }
243
244  @Override
245  public void suspendRegionServer(ServerName serverName) throws IOException {
246    suspendRegionServer(getRegionServerIndex(serverName));
247  }
248
249  @Override
250  public void resumeRegionServer(ServerName serverName) throws IOException {
251    resumeRegionServer(getRegionServerIndex(serverName));
252  }
253
254  @Override
255  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
256    // ignore timeout for now
257    waitOnRegionServer(getRegionServerIndex(serverName));
258  }
259
260  @Override
261  public void waitForRegionServerToSuspend(ServerName serverName, long timeout) throws IOException {
262    LOG.warn("Waiting for regionserver to suspend on mini cluster is not supported");
263  }
264
265  @Override
266  public void waitForRegionServerToResume(ServerName serverName, long timeout) throws IOException {
267    LOG.warn("Waiting for regionserver to resume on mini cluster is not supported");
268  }
269
270  @Override
271  public void startZkNode(String hostname, int port) throws IOException {
272    LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
273  }
274
275  @Override
276  public void killZkNode(ServerName serverName) throws IOException {
277    LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
278  }
279
280  @Override
281  public void stopZkNode(ServerName serverName) throws IOException {
282    LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
283  }
284
285  @Override
286  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
287    LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
288  }
289
290  @Override
291  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
292    LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
293  }
294
295  @Override
296  public void startDataNode(ServerName serverName) throws IOException {
297    LOG.warn("Starting datanodes on mini cluster is not supported");
298  }
299
300  @Override
301  public void killDataNode(ServerName serverName) throws IOException {
302    LOG.warn("Aborting datanodes on mini cluster is not supported");
303  }
304
305  @Override
306  public void stopDataNode(ServerName serverName) throws IOException {
307    LOG.warn("Stopping datanodes on mini cluster is not supported");
308  }
309
310  @Override
311  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
312    LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
313  }
314
315  @Override
316  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
317    LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
318  }
319
320  @Override
321  public void startNameNode(ServerName serverName) throws IOException {
322    LOG.warn("Starting namenodes on mini cluster is not supported");
323  }
324
325  @Override
326  public void killNameNode(ServerName serverName) throws IOException {
327    LOG.warn("Aborting namenodes on mini cluster is not supported");
328  }
329
330  @Override
331  public void stopNameNode(ServerName serverName) throws IOException {
332    LOG.warn("Stopping namenodes on mini cluster is not supported");
333  }
334
335  @Override
336  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
337    LOG.warn("Waiting for namenodes to start on mini cluster is not supported");
338  }
339
340  @Override
341  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
342    LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
343  }
344
345  @Override
346  public void startJournalNode(ServerName serverName) {
347    LOG.warn("Starting journalnodes on mini cluster is not supported");
348  }
349
350  @Override
351  public void killJournalNode(ServerName serverName) {
352    LOG.warn("Aborting journalnodes on mini cluster is not supported");
353  }
354
355  @Override
356  public void stopJournalNode(ServerName serverName) {
357    LOG.warn("Stopping journalnodes on mini cluster is not supported");
358  }
359
360  @Override
361  public void waitForJournalNodeToStart(ServerName serverName, long timeout) {
362    LOG.warn("Waiting for journalnodes to start on mini cluster is not supported");
363  }
364
365  @Override
366  public void waitForJournalNodeToStop(ServerName serverName, long timeout) {
367    LOG.warn("Waiting for journalnodes to stop on mini cluster is not supported");
368  }
369
370  @Override
371  public void startMaster(String hostname, int port) throws IOException {
372    this.startMaster();
373  }
374
375  @Override
376  public void killMaster(ServerName serverName) throws IOException {
377    abortMaster(getMasterIndex(serverName));
378  }
379
380  @Override
381  public void stopMaster(ServerName serverName) throws IOException {
382    stopMaster(getMasterIndex(serverName));
383  }
384
385  @Override
386  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
387    // ignore timeout for now
388    waitOnMaster(getMasterIndex(serverName));
389  }
390
391  /**
392   * Starts a region server thread running
393   * @return New RegionServerThread
394   */
395  public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException {
396    final Configuration newConf = HBaseConfiguration.create(conf);
397    return startRegionServer(newConf);
398  }
399
400  private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
401    throws IOException {
402    User rsUser = HBaseTestingUtil.getDifferentUser(configuration, ".hfs." + index++);
403    JVMClusterUtil.RegionServerThread t = null;
404    try {
405      t =
406        hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser);
407      t.start();
408      t.waitForServerOnline();
409    } catch (InterruptedException ie) {
410      throw new IOException("Interrupted adding regionserver to cluster", ie);
411    }
412    return t;
413  }
414
415  /**
416   * Starts a region server thread and waits until its processed by master. Throws an exception when
417   * it can't start a region server or when the region server is not processed by master within the
418   * timeout.
419   * @return New RegionServerThread
420   */
421  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
422    throws IOException {
423
424    JVMClusterUtil.RegionServerThread t = startRegionServer();
425    ServerName rsServerName = t.getRegionServer().getServerName();
426
427    long start = EnvironmentEdgeManager.currentTime();
428    ClusterMetrics clusterStatus = getClusterMetrics();
429    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
430      if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) {
431        return t;
432      }
433      Threads.sleep(100);
434    }
435    if (t.getRegionServer().isOnline()) {
436      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
437    } else {
438      throw new IOException("RS: " + rsServerName + " is offline");
439    }
440  }
441
442  /**
443   * Cause a region server to exit doing basic clean up only on its way out.
444   * @param serverNumber Used as index into a list.
445   */
446  public String abortRegionServer(int serverNumber) {
447    HRegionServer server = getRegionServer(serverNumber);
448    LOG.info("Aborting " + server.toString());
449    server.abort("Aborting for tests", new Exception("Trace info"));
450    return server.toString();
451  }
452
453  /**
454   * Shut down the specified region server cleanly
455   * @param serverNumber Used as index into a list.
456   * @return the region server that was stopped
457   */
458  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
459    return stopRegionServer(serverNumber, true);
460  }
461
462  /**
463   * Shut down the specified region server cleanly
464   * @param serverNumber Used as index into a list.
465   * @param shutdownFS   True is we are to shutdown the filesystem as part of this regionserver's
466   *                     shutdown. Usually we do but you do not want to do this if you are running
467   *                     multiple regionservers in a test and you shut down one before end of the
468   *                     test.
469   * @return the region server that was stopped
470   */
471  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
472    final boolean shutdownFS) {
473    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
474    LOG.info("Stopping " + server.toString());
475    server.getRegionServer().stop("Stopping rs " + serverNumber);
476    return server;
477  }
478
479  /**
480   * Suspend the specified region server
481   * @param serverNumber Used as index into a list.
482   */
483  public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) {
484    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
485    LOG.info("Suspending {}", server.toString());
486    server.suspend();
487    return server;
488  }
489
490  /**
491   * Resume the specified region server
492   * @param serverNumber Used as index into a list.
493   */
494  public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) {
495    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
496    LOG.info("Resuming {}", server.toString());
497    server.resume();
498    return server;
499  }
500
501  /**
502   * Wait for the specified region server to stop. Removes this thread from list of running threads.
503   * @return Name of region server that just went down.
504   */
505  public String waitOnRegionServer(final int serverNumber) {
506    return this.hbaseCluster.waitOnRegionServer(serverNumber);
507  }
508
509  /**
510   * Starts a master thread running
511   * @return New RegionServerThread
512   */
513  public JVMClusterUtil.MasterThread startMaster() throws IOException {
514    Configuration c = HBaseConfiguration.create(conf);
515    User user = HBaseTestingUtil.getDifferentUser(c, ".hfs." + index++);
516
517    JVMClusterUtil.MasterThread t = null;
518    try {
519      t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
520      t.start();
521    } catch (InterruptedException ie) {
522      throw new IOException("Interrupted adding master to cluster", ie);
523    }
524    conf.set(HConstants.MASTER_ADDRS_KEY,
525      hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY));
526    return t;
527  }
528
529  /**
530   * Returns the current active master, if available.
531   * @return the active HMaster, null if none is active.
532   */
533  public HMaster getMaster() {
534    return this.hbaseCluster.getActiveMaster();
535  }
536
537  /**
538   * Returns the current active master thread, if available.
539   * @return the active MasterThread, null if none is active.
540   */
541  public MasterThread getMasterThread() {
542    for (MasterThread mt : hbaseCluster.getLiveMasters()) {
543      if (mt.getMaster().isActiveMaster()) {
544        return mt;
545      }
546    }
547    return null;
548  }
549
550  /**
551   * Returns the master at the specified index, if available.
552   * @return the active HMaster, null if none is active.
553   */
554  public HMaster getMaster(final int serverNumber) {
555    return this.hbaseCluster.getMaster(serverNumber);
556  }
557
558  /**
559   * Cause a master to exit without shutting down entire cluster.
560   * @param serverNumber Used as index into a list.
561   */
562  public String abortMaster(int serverNumber) {
563    HMaster server = getMaster(serverNumber);
564    LOG.info("Aborting " + server.toString());
565    server.abort("Aborting for tests", new Exception("Trace info"));
566    return server.toString();
567  }
568
569  /**
570   * Shut down the specified master cleanly
571   * @param serverNumber Used as index into a list.
572   * @return the region server that was stopped
573   */
574  public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
575    return stopMaster(serverNumber, true);
576  }
577
578  /**
579   * Shut down the specified master cleanly
580   * @param serverNumber Used as index into a list.
581   * @param shutdownFS   True is we are to shutdown the filesystem as part of this master's
582   *                     shutdown. Usually we do but you do not want to do this if you are running
583   *                     multiple master in a test and you shut down one before end of the test.
584   * @return the master that was stopped
585   */
586  public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) {
587    JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber);
588    LOG.info("Stopping " + server.toString());
589    server.getMaster().stop("Stopping master " + serverNumber);
590    return server;
591  }
592
593  /**
594   * Wait for the specified master to stop. Removes this thread from list of running threads.
595   * @return Name of master that just went down.
596   */
597  public String waitOnMaster(final int serverNumber) {
598    return this.hbaseCluster.waitOnMaster(serverNumber);
599  }
600
601  /**
602   * Blocks until there is an active master and that master has completed initialization.
603   * @return true if an active master becomes available. false if there are no masters left.
604   */
605  @Override
606  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
607    long start = EnvironmentEdgeManager.currentTime();
608    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
609      for (JVMClusterUtil.MasterThread mt : getMasterThreads()) {
610        if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
611          return true;
612        }
613      }
614      Threads.sleep(100);
615    }
616    return false;
617  }
618
619  /**
620   * Returns list of master threads.
621   */
622  public List<JVMClusterUtil.MasterThread> getMasterThreads() {
623    return this.hbaseCluster.getMasters();
624  }
625
626  /**
627   * Returns list of live master threads (skips the aborted and the killed)
628   */
629  public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
630    return this.hbaseCluster.getLiveMasters();
631  }
632
633  /**
634   * Wait for Mini HBase Cluster to shut down.
635   */
636  public void join() {
637    this.hbaseCluster.join();
638  }
639
640  /**
641   * Shut down the mini HBase cluster
642   */
643  @Override
644  public void shutdown() throws IOException {
645    if (this.hbaseCluster != null) {
646      this.hbaseCluster.shutdown();
647    }
648  }
649
650  @Override
651  public void close() throws IOException {
652  }
653
654  @Override
655  public ClusterMetrics getClusterMetrics() throws IOException {
656    HMaster master = getMaster();
657    return master == null ? null : master.getClusterMetrics();
658  }
659
660  private void executeFlush(HRegion region) throws IOException {
661    if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
662      return;
663    }
664    // retry 5 times if we can not flush
665    for (int i = 0; i < 5; i++) {
666      FlushResult result = region.flush(true);
667      if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) {
668        return;
669      }
670      Threads.sleep(1000);
671    }
672  }
673
674  /**
675   * Call flushCache on all regions on all participating regionservers.
676   */
677  public void flushcache() throws IOException {
678    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
679      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
680        executeFlush(r);
681      }
682    }
683  }
684
685  /**
686   * Call flushCache on all regions of the specified table.
687   */
688  public void flushcache(TableName tableName) throws IOException {
689    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
690      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
691        if (r.getTableDescriptor().getTableName().equals(tableName)) {
692          executeFlush(r);
693        }
694      }
695    }
696  }
697
698  /**
699   * Call flushCache on all regions on all participating regionservers.
700   */
701  public void compact(boolean major) throws IOException {
702    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
703      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
704        if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
705          r.compact(major);
706        }
707      }
708    }
709  }
710
711  /**
712   * Call flushCache on all regions of the specified table.
713   */
714  public void compact(TableName tableName, boolean major) throws IOException {
715    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
716      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
717        if (r.getTableDescriptor().getTableName().equals(tableName)) {
718          if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
719            r.compact(major);
720          }
721        }
722      }
723    }
724  }
725
726  /**
727   * Returns number of live region servers in the cluster currently.
728   */
729  public int getNumLiveRegionServers() {
730    return this.hbaseCluster.getLiveRegionServers().size();
731  }
732
733  /**
734   * Returns list of region server threads. Does not return the master even though it is also a
735   * region server.
736   */
737  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
738    return this.hbaseCluster.getRegionServers();
739  }
740
741  /**
742   * Returns List of live region server threads (skips the aborted and the killed)
743   */
744  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
745    return this.hbaseCluster.getLiveRegionServers();
746  }
747
748  /**
749   * Grab a numbered region server of your choice.
750   * @return region server
751   */
752  public HRegionServer getRegionServer(int serverNumber) {
753    return hbaseCluster.getRegionServer(serverNumber);
754  }
755
756  public HRegionServer getRegionServer(ServerName serverName) {
757    return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer())
758      .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null);
759  }
760
761  public List<HRegion> getRegions(byte[] tableName) {
762    return getRegions(TableName.valueOf(tableName));
763  }
764
765  public List<HRegion> getRegions(TableName tableName) {
766    List<HRegion> ret = new ArrayList<>();
767    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
768      HRegionServer hrs = rst.getRegionServer();
769      for (Region region : hrs.getOnlineRegionsLocalContext()) {
770        if (region.getTableDescriptor().getTableName().equals(tableName)) {
771          ret.add((HRegion) region);
772        }
773      }
774    }
775    return ret;
776  }
777
778  /**
779   * Returns index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS
780   * carrying regionName. Returns -1 if none found.
781   */
782  public int getServerWithMeta() {
783    return getServerWith(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
784  }
785
786  /**
787   * Get the location of the specified region
788   * @param regionName Name of the region in bytes
789   * @return Index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS
790   *         carrying hbase:meta. Returns -1 if none found.
791   */
792  public int getServerWith(byte[] regionName) {
793    int index = 0;
794    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
795      HRegionServer hrs = rst.getRegionServer();
796      if (!hrs.isStopped()) {
797        Region region = hrs.getOnlineRegion(regionName);
798        if (region != null) {
799          return index;
800        }
801      }
802      index++;
803    }
804    return -1;
805  }
806
807  @Override
808  public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
809    throws IOException {
810    int index = getServerWith(regionName);
811    if (index < 0) {
812      return null;
813    }
814    return getRegionServer(index).getServerName();
815  }
816
817  /**
818   * Counts the total numbers of regions being served by the currently online region servers by
819   * asking each how many regions they have. Does not look at hbase:meta at all. Count includes
820   * catalog tables.
821   * @return number of regions being served by all region servers
822   */
823  public long countServedRegions() {
824    long count = 0;
825    for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
826      count += rst.getRegionServer().getNumberOfOnlineRegions();
827    }
828    return count;
829  }
830
831  /**
832   * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the
833   * mini-cluster back for clean shutdown.
834   */
835  public void killAll() {
836    // Do backups first.
837    MasterThread activeMaster = null;
838    for (MasterThread masterThread : getMasterThreads()) {
839      if (!masterThread.getMaster().isActiveMaster()) {
840        masterThread.getMaster().abort("killAll");
841      } else {
842        activeMaster = masterThread;
843      }
844    }
845    // Do active after.
846    if (activeMaster != null) {
847      activeMaster.getMaster().abort("killAll");
848    }
849    for (RegionServerThread rst : getRegionServerThreads()) {
850      rst.getRegionServer().abort("killAll");
851    }
852  }
853
854  @Override
855  public void waitUntilShutDown() {
856    this.hbaseCluster.join();
857  }
858
859  public List<HRegion> findRegionsForTable(TableName tableName) {
860    ArrayList<HRegion> ret = new ArrayList<>();
861    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
862      HRegionServer hrs = rst.getRegionServer();
863      for (Region region : hrs.getRegions(tableName)) {
864        if (region.getTableDescriptor().getTableName().equals(tableName)) {
865          ret.add((HRegion) region);
866        }
867      }
868    }
869    return ret;
870  }
871
872  protected int getRegionServerIndex(ServerName serverName) {
873    // we have a small number of region servers, this should be fine for now.
874    List<RegionServerThread> servers = getRegionServerThreads();
875    for (int i = 0; i < servers.size(); i++) {
876      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
877        return i;
878      }
879    }
880    return -1;
881  }
882
883  protected int getMasterIndex(ServerName serverName) {
884    List<MasterThread> masters = getMasterThreads();
885    for (int i = 0; i < masters.size(); i++) {
886      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
887        return i;
888      }
889    }
890    return -1;
891  }
892}