001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedAction;
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.client.RegionInfoBuilder;
029import org.apache.hadoop.hbase.client.RegionReplicaUtil;
030import org.apache.hadoop.hbase.master.HMaster;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.Region;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.test.MetricsAssertHelper;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.JVMClusterUtil;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
041import org.apache.hadoop.hbase.util.Threads;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.yetus.audience.InterfaceStability;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
048
049/**
050 * This class creates a single process HBase cluster. each server. The master uses the 'default'
051 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem
052 * instance each and will close down their instance on the way out.
053 */
054@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
055@InterfaceStability.Evolving
056public class SingleProcessHBaseCluster extends HBaseClusterInterface {
057  private static final Logger LOG =
058    LoggerFactory.getLogger(SingleProcessHBaseCluster.class.getName());
059  public LocalHBaseCluster hbaseCluster;
060  private static int index;
061
062  /**
063   * Start a MiniHBaseCluster.
064   * @param conf             Configuration to be used for cluster
065   * @param numRegionServers initial number of region servers to start.
066   */
067  public SingleProcessHBaseCluster(Configuration conf, int numRegionServers)
068    throws IOException, InterruptedException {
069    this(conf, 1, numRegionServers);
070  }
071
072  /**
073   * Start a MiniHBaseCluster.
074   * @param conf             Configuration to be used for cluster
075   * @param numMasters       initial number of masters to start.
076   * @param numRegionServers initial number of region servers to start.
077   */
078  public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
079    throws IOException, InterruptedException {
080    this(conf, numMasters, numRegionServers, null, null);
081  }
082
083  /**
084   * Start a MiniHBaseCluster.
085   * @param conf             Configuration to be used for cluster
086   * @param numMasters       initial number of masters to start.
087   * @param numRegionServers initial number of region servers to start.
088   */
089  public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
090    Class<? extends HMaster> masterClass,
091    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
092    throws IOException, InterruptedException {
093    this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
094  }
095
096  /**
097   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
098   *                restart where for sure the regionservers come up on same address+port (but just
099   *                with different startcode); by default mini hbase clusters choose new arbitrary
100   *                ports on each cluster start.
101   */
102  public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
103    int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
104    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
105    throws IOException, InterruptedException {
106    super(conf);
107
108    // Hadoop 2
109    CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
110
111    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
112      regionserverClass);
113    this.initialClusterStatus = getClusterMetrics();
114  }
115
116  public Configuration getConfiguration() {
117    return this.conf;
118  }
119
120  /**
121   * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance
122   * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem
123   * only, not All filesystems as the FileSystem system exit hook does.
124   */
125  public static class MiniHBaseClusterRegionServer extends HRegionServer {
126    private Thread shutdownThread = null;
127    private User user = null;
128    /**
129     * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
130     * restarted instances of the same server will have different ServerName and will not coincide
131     * with past dead ones. So there's no need to cleanup this list.
132     */
133    static Set<ServerName> killedServers = new HashSet<>();
134
135    public MiniHBaseClusterRegionServer(Configuration conf)
136      throws IOException, InterruptedException {
137      super(conf);
138      this.user = User.getCurrent();
139    }
140
141    @Override
142    protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
143      throws IOException {
144      super.handleReportForDutyResponse(c);
145      // Run this thread to shutdown our filesystem on way out.
146      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
147    }
148
149    @Override
150    public void run() {
151      try {
152        this.user.runAs(new PrivilegedAction<Object>() {
153          @Override
154          public Object run() {
155            runRegionServer();
156            return null;
157          }
158        });
159      } catch (Throwable t) {
160        LOG.error("Exception in run", t);
161      } finally {
162        // Run this on the way out.
163        if (this.shutdownThread != null) {
164          this.shutdownThread.start();
165          Threads.shutdown(this.shutdownThread, 30000);
166        }
167      }
168    }
169
170    private void runRegionServer() {
171      super.run();
172    }
173
174    @Override
175    protected void kill() {
176      killedServers.add(getServerName());
177      super.kill();
178    }
179
180    @Override
181    public void abort(final String reason, final Throwable cause) {
182      this.user.runAs(new PrivilegedAction<Object>() {
183        @Override
184        public Object run() {
185          abortRegionServer(reason, cause);
186          return null;
187        }
188      });
189    }
190
191    private void abortRegionServer(String reason, Throwable cause) {
192      super.abort(reason, cause);
193    }
194  }
195
196  /**
197   * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook
198   * does.
199   */
200  static class SingleFileSystemShutdownThread extends Thread {
201    private final FileSystem fs;
202
203    SingleFileSystemShutdownThread(final FileSystem fs) {
204      super("Shutdown of " + fs);
205      this.fs = fs;
206    }
207
208    @Override
209    public void run() {
210      try {
211        LOG.info("Hook closing fs=" + this.fs);
212        this.fs.close();
213      } catch (NullPointerException npe) {
214        LOG.debug("Need to fix these: " + npe.toString());
215      } catch (IOException e) {
216        LOG.warn("Running hook", e);
217      }
218    }
219  }
220
221  private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
222    final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
223    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
224    throws IOException, InterruptedException {
225    try {
226      if (masterClass == null) {
227        masterClass = HMaster.class;
228      }
229      if (regionserverClass == null) {
230        regionserverClass = SingleProcessHBaseCluster.MiniHBaseClusterRegionServer.class;
231      }
232
233      // start up a LocalHBaseCluster
234      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0,
235        masterClass, regionserverClass);
236
237      // manually add the regionservers as other users
238      for (int i = 0; i < nRegionNodes; i++) {
239        Configuration rsConf = HBaseConfiguration.create(conf);
240        if (rsPorts != null) {
241          rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
242        }
243        User user = HBaseTestingUtil.getDifferentUser(rsConf, ".hfs." + index++);
244        hbaseCluster.addRegionServer(rsConf, i, user);
245      }
246
247      hbaseCluster.startup();
248    } catch (IOException e) {
249      shutdown();
250      throw e;
251    } catch (Throwable t) {
252      LOG.error("Error starting cluster", t);
253      shutdown();
254      throw new IOException("Shutting down", t);
255    }
256  }
257
258  @Override
259  public void startRegionServer(String hostname, int port) throws IOException {
260    final Configuration newConf = HBaseConfiguration.create(conf);
261    newConf.setInt(HConstants.REGIONSERVER_PORT, port);
262    startRegionServer(newConf);
263  }
264
265  @Override
266  public void killRegionServer(ServerName serverName) throws IOException {
267    HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
268    if (server instanceof MiniHBaseClusterRegionServer) {
269      LOG.info("Killing " + server.toString());
270      ((MiniHBaseClusterRegionServer) server).kill();
271    } else {
272      abortRegionServer(getRegionServerIndex(serverName));
273    }
274  }
275
276  @Override
277  public boolean isKilledRS(ServerName serverName) {
278    return MiniHBaseClusterRegionServer.killedServers.contains(serverName);
279  }
280
281  @Override
282  public void stopRegionServer(ServerName serverName) throws IOException {
283    stopRegionServer(getRegionServerIndex(serverName));
284  }
285
286  @Override
287  public void suspendRegionServer(ServerName serverName) throws IOException {
288    suspendRegionServer(getRegionServerIndex(serverName));
289  }
290
291  @Override
292  public void resumeRegionServer(ServerName serverName) throws IOException {
293    resumeRegionServer(getRegionServerIndex(serverName));
294  }
295
296  @Override
297  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
298    // ignore timeout for now
299    waitOnRegionServer(getRegionServerIndex(serverName));
300  }
301
302  @Override
303  public void startZkNode(String hostname, int port) throws IOException {
304    LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
305  }
306
307  @Override
308  public void killZkNode(ServerName serverName) throws IOException {
309    LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
310  }
311
312  @Override
313  public void stopZkNode(ServerName serverName) throws IOException {
314    LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
315  }
316
317  @Override
318  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
319    LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
320  }
321
322  @Override
323  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
324    LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
325  }
326
327  @Override
328  public void startDataNode(ServerName serverName) throws IOException {
329    LOG.warn("Starting datanodes on mini cluster is not supported");
330  }
331
332  @Override
333  public void killDataNode(ServerName serverName) throws IOException {
334    LOG.warn("Aborting datanodes on mini cluster is not supported");
335  }
336
337  @Override
338  public void stopDataNode(ServerName serverName) throws IOException {
339    LOG.warn("Stopping datanodes on mini cluster is not supported");
340  }
341
342  @Override
343  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
344    LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
345  }
346
347  @Override
348  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
349    LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
350  }
351
352  @Override
353  public void startNameNode(ServerName serverName) throws IOException {
354    LOG.warn("Starting namenodes on mini cluster is not supported");
355  }
356
357  @Override
358  public void killNameNode(ServerName serverName) throws IOException {
359    LOG.warn("Aborting namenodes on mini cluster is not supported");
360  }
361
362  @Override
363  public void stopNameNode(ServerName serverName) throws IOException {
364    LOG.warn("Stopping namenodes on mini cluster is not supported");
365  }
366
367  @Override
368  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
369    LOG.warn("Waiting for namenodes to start on mini cluster is not supported");
370  }
371
372  @Override
373  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
374    LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
375  }
376
377  @Override
378  public void startMaster(String hostname, int port) throws IOException {
379    this.startMaster();
380  }
381
382  @Override
383  public void killMaster(ServerName serverName) throws IOException {
384    abortMaster(getMasterIndex(serverName));
385  }
386
387  @Override
388  public void stopMaster(ServerName serverName) throws IOException {
389    stopMaster(getMasterIndex(serverName));
390  }
391
392  @Override
393  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
394    // ignore timeout for now
395    waitOnMaster(getMasterIndex(serverName));
396  }
397
398  /**
399   * Starts a region server thread running
400   * @return New RegionServerThread
401   */
402  public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException {
403    final Configuration newConf = HBaseConfiguration.create(conf);
404    return startRegionServer(newConf);
405  }
406
407  private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
408    throws IOException {
409    User rsUser = HBaseTestingUtil.getDifferentUser(configuration, ".hfs." + index++);
410    JVMClusterUtil.RegionServerThread t = null;
411    try {
412      t =
413        hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser);
414      t.start();
415      t.waitForServerOnline();
416    } catch (InterruptedException ie) {
417      throw new IOException("Interrupted adding regionserver to cluster", ie);
418    }
419    return t;
420  }
421
422  /**
423   * Starts a region server thread and waits until its processed by master. Throws an exception when
424   * it can't start a region server or when the region server is not processed by master within the
425   * timeout.
426   * @return New RegionServerThread
427   */
428  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
429    throws IOException {
430
431    JVMClusterUtil.RegionServerThread t = startRegionServer();
432    ServerName rsServerName = t.getRegionServer().getServerName();
433
434    long start = EnvironmentEdgeManager.currentTime();
435    ClusterMetrics clusterStatus = getClusterMetrics();
436    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
437      if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) {
438        return t;
439      }
440      Threads.sleep(100);
441    }
442    if (t.getRegionServer().isOnline()) {
443      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
444    } else {
445      throw new IOException("RS: " + rsServerName + " is offline");
446    }
447  }
448
449  /**
450   * Cause a region server to exit doing basic clean up only on its way out.
451   * @param serverNumber Used as index into a list.
452   */
453  public String abortRegionServer(int serverNumber) {
454    HRegionServer server = getRegionServer(serverNumber);
455    LOG.info("Aborting " + server.toString());
456    server.abort("Aborting for tests", new Exception("Trace info"));
457    return server.toString();
458  }
459
460  /**
461   * Shut down the specified region server cleanly
462   * @param serverNumber Used as index into a list.
463   * @return the region server that was stopped
464   */
465  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
466    return stopRegionServer(serverNumber, true);
467  }
468
469  /**
470   * Shut down the specified region server cleanly
471   * @param serverNumber Used as index into a list.
472   * @param shutdownFS   True is we are to shutdown the filesystem as part of this regionserver's
473   *                     shutdown. Usually we do but you do not want to do this if you are running
474   *                     multiple regionservers in a test and you shut down one before end of the
475   *                     test.
476   * @return the region server that was stopped
477   */
478  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
479    final boolean shutdownFS) {
480    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
481    LOG.info("Stopping " + server.toString());
482    server.getRegionServer().stop("Stopping rs " + serverNumber);
483    return server;
484  }
485
486  /**
487   * Suspend the specified region server
488   * @param serverNumber Used as index into a list.
489   */
490  public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) {
491    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
492    LOG.info("Suspending {}", server.toString());
493    server.suspend();
494    return server;
495  }
496
497  /**
498   * Resume the specified region server
499   * @param serverNumber Used as index into a list.
500   */
501  public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) {
502    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
503    LOG.info("Resuming {}", server.toString());
504    server.resume();
505    return server;
506  }
507
508  /**
509   * Wait for the specified region server to stop. Removes this thread from list of running threads.
510   * @return Name of region server that just went down.
511   */
512  public String waitOnRegionServer(final int serverNumber) {
513    return this.hbaseCluster.waitOnRegionServer(serverNumber);
514  }
515
516  /**
517   * Starts a master thread running
518   * @return New RegionServerThread
519   */
520  public JVMClusterUtil.MasterThread startMaster() throws IOException {
521    Configuration c = HBaseConfiguration.create(conf);
522    User user = HBaseTestingUtil.getDifferentUser(c, ".hfs." + index++);
523
524    JVMClusterUtil.MasterThread t = null;
525    try {
526      t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
527      t.start();
528    } catch (InterruptedException ie) {
529      throw new IOException("Interrupted adding master to cluster", ie);
530    }
531    conf.set(HConstants.MASTER_ADDRS_KEY,
532      hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY));
533    return t;
534  }
535
536  /**
537   * Returns the current active master, if available.
538   * @return the active HMaster, null if none is active.
539   */
540  public HMaster getMaster() {
541    return this.hbaseCluster.getActiveMaster();
542  }
543
544  /**
545   * Returns the current active master thread, if available.
546   * @return the active MasterThread, null if none is active.
547   */
548  public MasterThread getMasterThread() {
549    for (MasterThread mt : hbaseCluster.getLiveMasters()) {
550      if (mt.getMaster().isActiveMaster()) {
551        return mt;
552      }
553    }
554    return null;
555  }
556
557  /**
558   * Returns the master at the specified index, if available.
559   * @return the active HMaster, null if none is active.
560   */
561  public HMaster getMaster(final int serverNumber) {
562    return this.hbaseCluster.getMaster(serverNumber);
563  }
564
565  /**
566   * Cause a master to exit without shutting down entire cluster.
567   * @param serverNumber Used as index into a list.
568   */
569  public String abortMaster(int serverNumber) {
570    HMaster server = getMaster(serverNumber);
571    LOG.info("Aborting " + server.toString());
572    server.abort("Aborting for tests", new Exception("Trace info"));
573    return server.toString();
574  }
575
576  /**
577   * Shut down the specified master cleanly
578   * @param serverNumber Used as index into a list.
579   * @return the region server that was stopped
580   */
581  public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
582    return stopMaster(serverNumber, true);
583  }
584
585  /**
586   * Shut down the specified master cleanly
587   * @param serverNumber Used as index into a list.
588   * @param shutdownFS   True is we are to shutdown the filesystem as part of this master's
589   *                     shutdown. Usually we do but you do not want to do this if you are running
590   *                     multiple master in a test and you shut down one before end of the test.
591   * @return the master that was stopped
592   */
593  public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) {
594    JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber);
595    LOG.info("Stopping " + server.toString());
596    server.getMaster().stop("Stopping master " + serverNumber);
597    return server;
598  }
599
600  /**
601   * Wait for the specified master to stop. Removes this thread from list of running threads.
602   * @return Name of master that just went down.
603   */
604  public String waitOnMaster(final int serverNumber) {
605    return this.hbaseCluster.waitOnMaster(serverNumber);
606  }
607
608  /**
609   * Blocks until there is an active master and that master has completed initialization.
610   * @return true if an active master becomes available. false if there are no masters left.
611   */
612  @Override
613  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
614    long start = EnvironmentEdgeManager.currentTime();
615    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
616      for (JVMClusterUtil.MasterThread mt : getMasterThreads()) {
617        if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
618          return true;
619        }
620      }
621      Threads.sleep(100);
622    }
623    return false;
624  }
625
626  /**
627   * Returns list of master threads.
628   */
629  public List<JVMClusterUtil.MasterThread> getMasterThreads() {
630    return this.hbaseCluster.getMasters();
631  }
632
633  /**
634   * Returns list of live master threads (skips the aborted and the killed)
635   */
636  public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
637    return this.hbaseCluster.getLiveMasters();
638  }
639
640  /**
641   * Wait for Mini HBase Cluster to shut down.
642   */
643  public void join() {
644    this.hbaseCluster.join();
645  }
646
647  /**
648   * Shut down the mini HBase cluster
649   */
650  @Override
651  public void shutdown() throws IOException {
652    if (this.hbaseCluster != null) {
653      this.hbaseCluster.shutdown();
654    }
655  }
656
657  @Override
658  public void close() throws IOException {
659  }
660
661  @Override
662  public ClusterMetrics getClusterMetrics() throws IOException {
663    HMaster master = getMaster();
664    return master == null ? null : master.getClusterMetrics();
665  }
666
667  private void executeFlush(HRegion region) throws IOException {
668    if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
669      return;
670    }
671    // retry 5 times if we can not flush
672    for (int i = 0; i < 5; i++) {
673      FlushResult result = region.flush(true);
674      if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) {
675        return;
676      }
677      Threads.sleep(1000);
678    }
679  }
680
681  /**
682   * Call flushCache on all regions on all participating regionservers.
683   */
684  public void flushcache() throws IOException {
685    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
686      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
687        executeFlush(r);
688      }
689    }
690  }
691
692  /**
693   * Call flushCache on all regions of the specified table.
694   */
695  public void flushcache(TableName tableName) throws IOException {
696    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
697      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
698        if (r.getTableDescriptor().getTableName().equals(tableName)) {
699          executeFlush(r);
700        }
701      }
702    }
703  }
704
705  /**
706   * Call flushCache on all regions on all participating regionservers.
707   */
708  public void compact(boolean major) throws IOException {
709    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
710      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
711        if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
712          r.compact(major);
713        }
714      }
715    }
716  }
717
718  /**
719   * Call flushCache on all regions of the specified table.
720   */
721  public void compact(TableName tableName, boolean major) throws IOException {
722    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
723      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
724        if (r.getTableDescriptor().getTableName().equals(tableName)) {
725          if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
726            r.compact(major);
727          }
728        }
729      }
730    }
731  }
732
733  /**
734   * Returns number of live region servers in the cluster currently.
735   */
736  public int getNumLiveRegionServers() {
737    return this.hbaseCluster.getLiveRegionServers().size();
738  }
739
740  /**
741   * Returns list of region server threads. Does not return the master even though it is also a
742   * region server.
743   */
744  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
745    return this.hbaseCluster.getRegionServers();
746  }
747
748  /**
749   * Returns List of live region server threads (skips the aborted and the killed)
750   */
751  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
752    return this.hbaseCluster.getLiveRegionServers();
753  }
754
755  /**
756   * Grab a numbered region server of your choice.
757   * @return region server
758   */
759  public HRegionServer getRegionServer(int serverNumber) {
760    return hbaseCluster.getRegionServer(serverNumber);
761  }
762
763  public HRegionServer getRegionServer(ServerName serverName) {
764    return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer())
765      .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null);
766  }
767
768  public List<HRegion> getRegions(byte[] tableName) {
769    return getRegions(TableName.valueOf(tableName));
770  }
771
772  public List<HRegion> getRegions(TableName tableName) {
773    List<HRegion> ret = new ArrayList<>();
774    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
775      HRegionServer hrs = rst.getRegionServer();
776      for (Region region : hrs.getOnlineRegionsLocalContext()) {
777        if (region.getTableDescriptor().getTableName().equals(tableName)) {
778          ret.add((HRegion) region);
779        }
780      }
781    }
782    return ret;
783  }
784
785  /**
786   * Returns index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS
787   * carrying regionName. Returns -1 if none found.
788   */
789  public int getServerWithMeta() {
790    return getServerWith(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
791  }
792
793  /**
794   * Get the location of the specified region
795   * @param regionName Name of the region in bytes
796   * @return Index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS
797   *         carrying hbase:meta. Returns -1 if none found.
798   */
799  public int getServerWith(byte[] regionName) {
800    int index = 0;
801    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
802      HRegionServer hrs = rst.getRegionServer();
803      if (!hrs.isStopped()) {
804        Region region = hrs.getOnlineRegion(regionName);
805        if (region != null) {
806          return index;
807        }
808      }
809      index++;
810    }
811    return -1;
812  }
813
814  @Override
815  public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
816    throws IOException {
817    int index = getServerWith(regionName);
818    if (index < 0) {
819      return null;
820    }
821    return getRegionServer(index).getServerName();
822  }
823
824  /**
825   * Counts the total numbers of regions being served by the currently online region servers by
826   * asking each how many regions they have. Does not look at hbase:meta at all. Count includes
827   * catalog tables.
828   * @return number of regions being served by all region servers
829   */
830  public long countServedRegions() {
831    long count = 0;
832    for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
833      count += rst.getRegionServer().getNumberOfOnlineRegions();
834    }
835    return count;
836  }
837
838  /**
839   * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the
840   * mini-cluster back for clean shutdown.
841   */
842  public void killAll() {
843    // Do backups first.
844    MasterThread activeMaster = null;
845    for (MasterThread masterThread : getMasterThreads()) {
846      if (!masterThread.getMaster().isActiveMaster()) {
847        masterThread.getMaster().abort("killAll");
848      } else {
849        activeMaster = masterThread;
850      }
851    }
852    // Do active after.
853    if (activeMaster != null) {
854      activeMaster.getMaster().abort("killAll");
855    }
856    for (RegionServerThread rst : getRegionServerThreads()) {
857      rst.getRegionServer().abort("killAll");
858    }
859  }
860
861  @Override
862  public void waitUntilShutDown() {
863    this.hbaseCluster.join();
864  }
865
866  public List<HRegion> findRegionsForTable(TableName tableName) {
867    ArrayList<HRegion> ret = new ArrayList<>();
868    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
869      HRegionServer hrs = rst.getRegionServer();
870      for (Region region : hrs.getRegions(tableName)) {
871        if (region.getTableDescriptor().getTableName().equals(tableName)) {
872          ret.add((HRegion) region);
873        }
874      }
875    }
876    return ret;
877  }
878
879  protected int getRegionServerIndex(ServerName serverName) {
880    // we have a small number of region servers, this should be fine for now.
881    List<RegionServerThread> servers = getRegionServerThreads();
882    for (int i = 0; i < servers.size(); i++) {
883      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
884        return i;
885      }
886    }
887    return -1;
888  }
889
890  protected int getMasterIndex(ServerName serverName) {
891    List<MasterThread> masters = getMasterThreads();
892    for (int i = 0; i < masters.size(); i++) {
893      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
894        return i;
895      }
896    }
897    return -1;
898  }
899}