001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.security.PrivilegedAction;
023import java.util.ArrayList;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Set;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.hbase.master.HMaster;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.Region;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.test.MetricsAssertHelper;
037import org.apache.hadoop.hbase.util.JVMClusterUtil;
038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
040import org.apache.hadoop.hbase.util.Threads;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
049
050/**
051 * This class creates a single process HBase cluster.
052 * each server.  The master uses the 'default' FileSystem.  The RegionServers,
053 * if we are running on DistributedFilesystem, create a FileSystem instance
054 * each and will close down their instance on the way out.
055 */
056@InterfaceAudience.Public
057public class MiniHBaseCluster extends HBaseCluster {
058  private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName());
059  public LocalHBaseCluster hbaseCluster;
060  private static int index;
061
062  /**
063   * Start a MiniHBaseCluster.
064   * @param conf Configuration to be used for cluster
065   * @param numRegionServers initial number of region servers to start.
066   * @throws IOException
067   */
068  public MiniHBaseCluster(Configuration conf, int numRegionServers)
069  throws IOException, InterruptedException {
070    this(conf, 1, numRegionServers);
071  }
072
073  /**
074   * Start a MiniHBaseCluster.
075   * @param conf Configuration to be used for cluster
076   * @param numMasters initial number of masters to start.
077   * @param numRegionServers initial number of region servers to start.
078   * @throws IOException
079   */
080  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
081      throws IOException, InterruptedException {
082    this(conf, numMasters, numRegionServers, null, null);
083  }
084
085  /**
086   * Start a MiniHBaseCluster.
087   * @param conf Configuration to be used for cluster
088   * @param numMasters initial number of masters to start.
089   * @param numRegionServers initial number of region servers to start.
090   */
091  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
092         Class<? extends HMaster> masterClass,
093         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
094      throws IOException, InterruptedException {
095    this(conf, numMasters, numRegionServers, null, masterClass, regionserverClass);
096  }
097
098  /**
099   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
100   *   restart where for sure the regionservers come up on same address+port (but
101   *   just with different startcode); by default mini hbase clusters choose new
102   *   arbitrary ports on each cluster start.
103   * @throws IOException
104   * @throws InterruptedException
105   */
106  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
107         List<Integer> rsPorts,
108         Class<? extends HMaster> masterClass,
109         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
110      throws IOException, InterruptedException {
111    super(conf);
112
113    // Hadoop 2
114    CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
115
116    init(numMasters, numRegionServers, rsPorts, masterClass, regionserverClass);
117    this.initialClusterStatus = getClusterStatus();
118  }
119
120  public Configuration getConfiguration() {
121    return this.conf;
122  }
123
124  /**
125   * Subclass so can get at protected methods (none at moment).  Also, creates
126   * a FileSystem instance per instantiation.  Adds a shutdown own FileSystem
127   * on the way out. Shuts down own Filesystem only, not All filesystems as
128   * the FileSystem system exit hook does.
129   */
130  public static class MiniHBaseClusterRegionServer extends HRegionServer {
131    private Thread shutdownThread = null;
132    private User user = null;
133    /**
134     * List of RegionServers killed so far. ServerName also comprises startCode of a server,
135     * so any restarted instances of the same server will have different ServerName and will not
136     * coincide with past dead ones. So there's no need to cleanup this list.
137     */
138    static Set<ServerName> killedServers = new HashSet<>();
139
140    public MiniHBaseClusterRegionServer(Configuration conf)
141        throws IOException, InterruptedException {
142      super(conf);
143      this.user = User.getCurrent();
144    }
145
146    /*
147     * @param c
148     * @param currentfs We return this if we did not make a new one.
149     * @param uniqueName Same name used to help identify the created fs.
150     * @return A new fs instance if we are up on DistributeFileSystem.
151     * @throws IOException
152     */
153
154    @Override
155    protected void handleReportForDutyResponse(
156        final RegionServerStartupResponse c) throws IOException {
157      super.handleReportForDutyResponse(c);
158      // Run this thread to shutdown our filesystem on way out.
159      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
160    }
161
162    @Override
163    public void run() {
164      try {
165        this.user.runAs(new PrivilegedAction<Object>() {
166          @Override
167          public Object run() {
168            runRegionServer();
169            return null;
170          }
171        });
172      } catch (Throwable t) {
173        LOG.error("Exception in run", t);
174      } finally {
175        // Run this on the way out.
176        if (this.shutdownThread != null) {
177          this.shutdownThread.start();
178          Threads.shutdown(this.shutdownThread, 30000);
179        }
180      }
181    }
182
183    private void runRegionServer() {
184      super.run();
185    }
186
187    @Override
188    protected void kill() {
189      killedServers.add(getServerName());
190      super.kill();
191    }
192
193    @Override
194    public void abort(final String reason, final Throwable cause) {
195      this.user.runAs(new PrivilegedAction<Object>() {
196        @Override
197        public Object run() {
198          abortRegionServer(reason, cause);
199          return null;
200        }
201      });
202    }
203
204    private void abortRegionServer(String reason, Throwable cause) {
205      super.abort(reason, cause);
206    }
207  }
208
209  /**
210   * Alternate shutdown hook.
211   * Just shuts down the passed fs, not all as default filesystem hook does.
212   */
213  static class SingleFileSystemShutdownThread extends Thread {
214    private final FileSystem fs;
215    SingleFileSystemShutdownThread(final FileSystem fs) {
216      super("Shutdown of " + fs);
217      this.fs = fs;
218    }
219    @Override
220    public void run() {
221      try {
222        LOG.info("Hook closing fs=" + this.fs);
223        this.fs.close();
224      } catch (NullPointerException npe) {
225        LOG.debug("Need to fix these: " + npe.toString());
226      } catch (IOException e) {
227        LOG.warn("Running hook", e);
228      }
229    }
230  }
231
232  private void init(final int nMasterNodes, final int nRegionNodes, List<Integer> rsPorts,
233                 Class<? extends HMaster> masterClass,
234                 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
235  throws IOException, InterruptedException {
236    try {
237      if (masterClass == null){
238        masterClass =  HMaster.class;
239      }
240      if (regionserverClass == null){
241        regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
242      }
243
244      // start up a LocalHBaseCluster
245      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
246          masterClass, regionserverClass);
247
248      // manually add the regionservers as other users
249      for (int i = 0; i < nRegionNodes; i++) {
250        Configuration rsConf = HBaseConfiguration.create(conf);
251        if (rsPorts != null) {
252          rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
253        }
254        User user = HBaseTestingUtility.getDifferentUser(rsConf,
255            ".hfs."+index++);
256        hbaseCluster.addRegionServer(rsConf, i, user);
257      }
258
259      hbaseCluster.startup();
260    } catch (IOException e) {
261      shutdown();
262      throw e;
263    } catch (Throwable t) {
264      LOG.error("Error starting cluster", t);
265      shutdown();
266      throw new IOException("Shutting down", t);
267    }
268  }
269
270  @Override
271  public void startRegionServer(String hostname, int port) throws IOException {
272    final Configuration newConf = HBaseConfiguration.create(conf);
273    newConf.setInt(HConstants.REGIONSERVER_PORT, port);
274    startRegionServer(newConf);
275  }
276
277  @Override
278  public void killRegionServer(ServerName serverName) throws IOException {
279    HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
280    if (server instanceof MiniHBaseClusterRegionServer) {
281      LOG.info("Killing " + server.toString());
282      ((MiniHBaseClusterRegionServer) server).kill();
283    } else {
284      abortRegionServer(getRegionServerIndex(serverName));
285    }
286  }
287
288  @Override
289  public boolean isKilledRS(ServerName serverName) {
290    return MiniHBaseClusterRegionServer.killedServers.contains(serverName);
291  }
292
293  @Override
294  public void stopRegionServer(ServerName serverName) throws IOException {
295    stopRegionServer(getRegionServerIndex(serverName));
296  }
297
298  @Override
299  public void suspendRegionServer(ServerName serverName) throws IOException {
300    suspendRegionServer(getRegionServerIndex(serverName));
301  }
302
303  @Override
304  public void resumeRegionServer(ServerName serverName) throws IOException {
305    resumeRegionServer(getRegionServerIndex(serverName));
306  }
307
308  @Override
309  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
310    //ignore timeout for now
311    waitOnRegionServer(getRegionServerIndex(serverName));
312  }
313
314  @Override
315  public void startZkNode(String hostname, int port) throws IOException {
316    LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
317  }
318
319  @Override
320  public void killZkNode(ServerName serverName) throws IOException {
321    LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
322  }
323
324  @Override
325  public void stopZkNode(ServerName serverName) throws IOException {
326    LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
327  }
328
329  @Override
330  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
331    LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
332  }
333
334  @Override
335  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
336    LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
337  }
338
339  @Override
340  public void startDataNode(ServerName serverName) throws IOException {
341    LOG.warn("Starting datanodes on mini cluster is not supported");
342  }
343
344  @Override
345  public void killDataNode(ServerName serverName) throws IOException {
346    LOG.warn("Aborting datanodes on mini cluster is not supported");
347  }
348
349  @Override
350  public void stopDataNode(ServerName serverName) throws IOException {
351    LOG.warn("Stopping datanodes on mini cluster is not supported");
352  }
353
354  @Override
355  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
356    LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
357  }
358
359  @Override
360  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
361    LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
362  }
363
364  @Override
365  public void startNameNode(ServerName serverName) throws IOException {
366    LOG.warn("Starting namenodes on mini cluster is not supported");
367  }
368
369  @Override
370  public void killNameNode(ServerName serverName) throws IOException {
371    LOG.warn("Aborting namenodes on mini cluster is not supported");
372  }
373
374  @Override
375  public void stopNameNode(ServerName serverName) throws IOException {
376    LOG.warn("Stopping namenodes on mini cluster is not supported");
377  }
378
379  @Override
380  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
381    LOG.warn("Waiting for namenodes to start on mini cluster is not supported");
382  }
383
384  @Override
385  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
386    LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
387  }
388
389  @Override
390  public void startMaster(String hostname, int port) throws IOException {
391    this.startMaster();
392  }
393
394  @Override
395  public void killMaster(ServerName serverName) throws IOException {
396    abortMaster(getMasterIndex(serverName));
397  }
398
399  @Override
400  public void stopMaster(ServerName serverName) throws IOException {
401    stopMaster(getMasterIndex(serverName));
402  }
403
404  @Override
405  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
406    //ignore timeout for now
407    waitOnMaster(getMasterIndex(serverName));
408  }
409
410  /**
411   * Starts a region server thread running
412   *
413   * @throws IOException
414   * @return New RegionServerThread
415   */
416  public JVMClusterUtil.RegionServerThread startRegionServer()
417      throws IOException {
418    final Configuration newConf = HBaseConfiguration.create(conf);
419    return startRegionServer(newConf);
420  }
421
422  private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
423      throws IOException {
424    User rsUser =
425        HBaseTestingUtility.getDifferentUser(configuration, ".hfs."+index++);
426    JVMClusterUtil.RegionServerThread t =  null;
427    try {
428      t = hbaseCluster.addRegionServer(
429          configuration, hbaseCluster.getRegionServers().size(), rsUser);
430      t.start();
431      t.waitForServerOnline();
432    } catch (InterruptedException ie) {
433      throw new IOException("Interrupted adding regionserver to cluster", ie);
434    }
435    return t;
436  }
437
438  /**
439   * Starts a region server thread and waits until its processed by master. Throws an exception
440   * when it can't start a region server or when the region server is not processed by master
441   * within the timeout.
442   *
443   * @return New RegionServerThread
444   */
445  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
446      throws IOException {
447
448    JVMClusterUtil.RegionServerThread t =  startRegionServer();
449    ServerName rsServerName = t.getRegionServer().getServerName();
450
451    long start = System.currentTimeMillis();
452    ClusterStatus clusterStatus = getClusterStatus();
453    while ((System.currentTimeMillis() - start) < timeout) {
454      if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) {
455        return t;
456      }
457      Threads.sleep(100);
458    }
459    if (t.getRegionServer().isOnline()) {
460      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
461    } else {
462      throw new IOException("RS: " + rsServerName + " is offline");
463    }
464  }
465
466  /**
467   * Cause a region server to exit doing basic clean up only on its way out.
468   * @param serverNumber  Used as index into a list.
469   */
470  public String abortRegionServer(int serverNumber) {
471    HRegionServer server = getRegionServer(serverNumber);
472    LOG.info("Aborting " + server.toString());
473    server.abort("Aborting for tests", new Exception("Trace info"));
474    return server.toString();
475  }
476
477  /**
478   * Shut down the specified region server cleanly
479   *
480   * @param serverNumber  Used as index into a list.
481   * @return the region server that was stopped
482   */
483  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
484    return stopRegionServer(serverNumber, true);
485  }
486
487  /**
488   * Shut down the specified region server cleanly
489   *
490   * @param serverNumber  Used as index into a list.
491   * @param shutdownFS True is we are to shutdown the filesystem as part of this
492   * regionserver's shutdown.  Usually we do but you do not want to do this if
493   * you are running multiple regionservers in a test and you shut down one
494   * before end of the test.
495   * @return the region server that was stopped
496   */
497  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
498      final boolean shutdownFS) {
499    JVMClusterUtil.RegionServerThread server =
500      hbaseCluster.getRegionServers().get(serverNumber);
501    LOG.info("Stopping " + server.toString());
502    server.getRegionServer().stop("Stopping rs " + serverNumber);
503    return server;
504  }
505
506  /**
507   * Suspend the specified region server
508   * @param serverNumber Used as index into a list.
509   * @return
510   */
511  public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) {
512    JVMClusterUtil.RegionServerThread server =
513        hbaseCluster.getRegionServers().get(serverNumber);
514    LOG.info("Suspending {}", server.toString());
515    server.suspend();
516    return server;
517  }
518
519  /**
520   * Resume the specified region server
521   * @param serverNumber Used as index into a list.
522   * @return
523   */
524  public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) {
525    JVMClusterUtil.RegionServerThread server =
526        hbaseCluster.getRegionServers().get(serverNumber);
527    LOG.info("Resuming {}", server.toString());
528    server.resume();
529    return server;
530  }
531
532  /**
533   * Wait for the specified region server to stop. Removes this thread from list
534   * of running threads.
535   * @param serverNumber
536   * @return Name of region server that just went down.
537   */
538  public String waitOnRegionServer(final int serverNumber) {
539    return this.hbaseCluster.waitOnRegionServer(serverNumber);
540  }
541
542
543  /**
544   * Starts a master thread running
545   *
546   * @return New RegionServerThread
547   */
548  public JVMClusterUtil.MasterThread startMaster() throws IOException {
549    Configuration c = HBaseConfiguration.create(conf);
550    User user =
551        HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
552
553    JVMClusterUtil.MasterThread t = null;
554    try {
555      t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
556      t.start();
557    } catch (InterruptedException ie) {
558      throw new IOException("Interrupted adding master to cluster", ie);
559    }
560    return t;
561  }
562
563  /**
564   * Returns the current active master, if available.
565   * @return the active HMaster, null if none is active.
566   */
567  @Override
568  public MasterService.BlockingInterface getMasterAdminService() {
569    return this.hbaseCluster.getActiveMaster().getMasterRpcServices();
570  }
571
572  /**
573   * Returns the current active master, if available.
574   * @return the active HMaster, null if none is active.
575   */
576  public HMaster getMaster() {
577    return this.hbaseCluster.getActiveMaster();
578  }
579
580  /**
581   * Returns the current active master thread, if available.
582   * @return the active MasterThread, null if none is active.
583   */
584  public MasterThread getMasterThread() {
585    for (MasterThread mt: hbaseCluster.getLiveMasters()) {
586      if (mt.getMaster().isActiveMaster()) {
587        return mt;
588      }
589    }
590    return null;
591  }
592
593  /**
594   * Returns the master at the specified index, if available.
595   * @return the active HMaster, null if none is active.
596   */
597  public HMaster getMaster(final int serverNumber) {
598    return this.hbaseCluster.getMaster(serverNumber);
599  }
600
601  /**
602   * Cause a master to exit without shutting down entire cluster.
603   * @param serverNumber  Used as index into a list.
604   */
605  public String abortMaster(int serverNumber) {
606    HMaster server = getMaster(serverNumber);
607    LOG.info("Aborting " + server.toString());
608    server.abort("Aborting for tests", new Exception("Trace info"));
609    return server.toString();
610  }
611
612  /**
613   * Shut down the specified master cleanly
614   *
615   * @param serverNumber  Used as index into a list.
616   * @return the region server that was stopped
617   */
618  public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
619    return stopMaster(serverNumber, true);
620  }
621
622  /**
623   * Shut down the specified master cleanly
624   *
625   * @param serverNumber  Used as index into a list.
626   * @param shutdownFS True is we are to shutdown the filesystem as part of this
627   * master's shutdown.  Usually we do but you do not want to do this if
628   * you are running multiple master in a test and you shut down one
629   * before end of the test.
630   * @return the master that was stopped
631   */
632  public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
633      final boolean shutdownFS) {
634    JVMClusterUtil.MasterThread server =
635      hbaseCluster.getMasters().get(serverNumber);
636    LOG.info("Stopping " + server.toString());
637    server.getMaster().stop("Stopping master " + serverNumber);
638    return server;
639  }
640
641  /**
642   * Wait for the specified master to stop. Removes this thread from list
643   * of running threads.
644   * @param serverNumber
645   * @return Name of master that just went down.
646   */
647  public String waitOnMaster(final int serverNumber) {
648    return this.hbaseCluster.waitOnMaster(serverNumber);
649  }
650
651  /**
652   * Blocks until there is an active master and that master has completed
653   * initialization.
654   *
655   * @return true if an active master becomes available.  false if there are no
656   *         masters left.
657   * @throws InterruptedException
658   */
659  @Override
660  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
661    List<JVMClusterUtil.MasterThread> mts;
662    long start = System.currentTimeMillis();
663    while (!(mts = getMasterThreads()).isEmpty()
664        && (System.currentTimeMillis() - start) < timeout) {
665      for (JVMClusterUtil.MasterThread mt : mts) {
666        if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
667          return true;
668        }
669      }
670
671      Threads.sleep(100);
672    }
673    return false;
674  }
675
676  /**
677   * @return List of master threads.
678   */
679  public List<JVMClusterUtil.MasterThread> getMasterThreads() {
680    return this.hbaseCluster.getMasters();
681  }
682
683  /**
684   * @return List of live master threads (skips the aborted and the killed)
685   */
686  public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
687    return this.hbaseCluster.getLiveMasters();
688  }
689
690  /**
691   * Wait for Mini HBase Cluster to shut down.
692   */
693  public void join() {
694    this.hbaseCluster.join();
695  }
696
697  /**
698   * Shut down the mini HBase cluster
699   */
700  @Override
701  public void shutdown() throws IOException {
702    if (this.hbaseCluster != null) {
703      this.hbaseCluster.shutdown();
704    }
705  }
706
707  @Override
708  public void close() throws IOException {
709  }
710
711  /**
712   * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
713   *             Use {@link #getClusterMetrics()} instead.
714   */
715  @Deprecated
716  public ClusterStatus getClusterStatus() throws IOException {
717    HMaster master = getMaster();
718    return master == null ? null : new ClusterStatus(master.getClusterMetrics());
719  }
720
721  @Override
722  public ClusterMetrics getClusterMetrics() throws IOException {
723    HMaster master = getMaster();
724    return master == null ? null : master.getClusterMetrics();
725  }
726
727  private void executeFlush(HRegion region) throws IOException {
728    // retry 5 times if we can not flush
729    for (int i = 0; i < 5; i++) {
730      FlushResult result = region.flush(true);
731      if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) {
732        return;
733      }
734      Threads.sleep(1000);
735    }
736  }
737
738  /**
739   * Call flushCache on all regions on all participating regionservers.
740   */
741  public void flushcache() throws IOException {
742    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
743      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
744        executeFlush(r);
745      }
746    }
747  }
748
749  /**
750   * Call flushCache on all regions of the specified table.
751   */
752  public void flushcache(TableName tableName) throws IOException {
753    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
754      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
755        if (r.getTableDescriptor().getTableName().equals(tableName)) {
756          executeFlush(r);
757        }
758      }
759    }
760  }
761
762  /**
763   * Call flushCache on all regions on all participating regionservers.
764   * @throws IOException
765   */
766  public void compact(boolean major) throws IOException {
767    for (JVMClusterUtil.RegionServerThread t:
768        this.hbaseCluster.getRegionServers()) {
769      for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
770        r.compact(major);
771      }
772    }
773  }
774
775  /**
776   * Call flushCache on all regions of the specified table.
777   * @throws IOException
778   */
779  public void compact(TableName tableName, boolean major) throws IOException {
780    for (JVMClusterUtil.RegionServerThread t:
781        this.hbaseCluster.getRegionServers()) {
782      for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
783        if(r.getTableDescriptor().getTableName().equals(tableName)) {
784          r.compact(major);
785        }
786      }
787    }
788  }
789
790  /**
791   * @return Number of live region servers in the cluster currently.
792   */
793  public int getNumLiveRegionServers() {
794    return this.hbaseCluster.getLiveRegionServers().size();
795  }
796
797  /**
798   * @return List of region server threads. Does not return the master even though it is also
799   * a region server.
800   */
801  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
802    return this.hbaseCluster.getRegionServers();
803  }
804
805  /**
806   * @return List of live region server threads (skips the aborted and the killed)
807   */
808  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
809    return this.hbaseCluster.getLiveRegionServers();
810  }
811
812  /**
813   * Grab a numbered region server of your choice.
814   * @param serverNumber
815   * @return region server
816   */
817  public HRegionServer getRegionServer(int serverNumber) {
818    return hbaseCluster.getRegionServer(serverNumber);
819  }
820
821  public HRegionServer getRegionServer(ServerName serverName) {
822    return hbaseCluster.getRegionServers().stream()
823        .map(t -> t.getRegionServer())
824        .filter(r -> r.getServerName().equals(serverName))
825        .findFirst().orElse(null);
826  }
827
828  public List<HRegion> getRegions(byte[] tableName) {
829    return getRegions(TableName.valueOf(tableName));
830  }
831
832  public List<HRegion> getRegions(TableName tableName) {
833    List<HRegion> ret = new ArrayList<>();
834    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
835      HRegionServer hrs = rst.getRegionServer();
836      for (Region region : hrs.getOnlineRegionsLocalContext()) {
837        if (region.getTableDescriptor().getTableName().equals(tableName)) {
838          ret.add((HRegion)region);
839        }
840      }
841    }
842    return ret;
843  }
844
845  /**
846   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
847   * of HRS carrying regionName. Returns -1 if none found.
848   */
849  public int getServerWithMeta() {
850    return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
851  }
852
853  /**
854   * Get the location of the specified region
855   * @param regionName Name of the region in bytes
856   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
857   * of HRS carrying hbase:meta. Returns -1 if none found.
858   */
859  public int getServerWith(byte[] regionName) {
860    int index = -1;
861    int count = 0;
862    for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
863      HRegionServer hrs = rst.getRegionServer();
864      if (!hrs.isStopped()) {
865        Region region = hrs.getOnlineRegion(regionName);
866        if (region != null) {
867          index = count;
868          break;
869        }
870      }
871      count++;
872    }
873    return index;
874  }
875
876  @Override
877  public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
878  throws IOException {
879    // Assume there is only one master thread which is the active master.
880    // If there are multiple master threads, the backup master threads
881    // should hold some regions. Please refer to #countServedRegions
882    // to see how we find out all regions.
883    HMaster master = getMaster();
884    Region region = master.getOnlineRegion(regionName);
885    if (region != null) {
886      return master.getServerName();
887    }
888    int index = getServerWith(regionName);
889    if (index < 0) {
890      return null;
891    }
892    return getRegionServer(index).getServerName();
893  }
894
895  /**
896   * Counts the total numbers of regions being served by the currently online
897   * region servers by asking each how many regions they have.  Does not look
898   * at hbase:meta at all.  Count includes catalog tables.
899   * @return number of regions being served by all region servers
900   */
901  public long countServedRegions() {
902    long count = 0;
903    for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
904      count += rst.getRegionServer().getNumberOfOnlineRegions();
905    }
906    for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
907      count += mt.getMaster().getNumberOfOnlineRegions();
908    }
909    return count;
910  }
911
912  /**
913   * Do a simulated kill all masters and regionservers. Useful when it is
914   * impossible to bring the mini-cluster back for clean shutdown.
915   */
916  public void killAll() {
917    // Do backups first.
918    MasterThread activeMaster = null;
919    for (MasterThread masterThread : getMasterThreads()) {
920      if (!masterThread.getMaster().isActiveMaster()) {
921        masterThread.getMaster().abort("killAll");
922      } else {
923        activeMaster = masterThread;
924      }
925    }
926    // Do active after.
927    if (activeMaster != null) {
928      activeMaster.getMaster().abort("killAll");
929    }
930    for (RegionServerThread rst : getRegionServerThreads()) {
931      rst.getRegionServer().abort("killAll");
932    }
933  }
934
935  @Override
936  public void waitUntilShutDown() {
937    this.hbaseCluster.join();
938  }
939
940  public List<HRegion> findRegionsForTable(TableName tableName) {
941    ArrayList<HRegion> ret = new ArrayList<>();
942    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
943      HRegionServer hrs = rst.getRegionServer();
944      for (Region region : hrs.getRegions(tableName)) {
945        if (region.getTableDescriptor().getTableName().equals(tableName)) {
946          ret.add((HRegion)region);
947        }
948      }
949    }
950    return ret;
951  }
952
953
954  protected int getRegionServerIndex(ServerName serverName) {
955    //we have a small number of region servers, this should be fine for now.
956    List<RegionServerThread> servers = getRegionServerThreads();
957    for (int i=0; i < servers.size(); i++) {
958      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
959        return i;
960      }
961    }
962    return -1;
963  }
964
965  protected int getMasterIndex(ServerName serverName) {
966    List<MasterThread> masters = getMasterThreads();
967    for (int i = 0; i < masters.size(); i++) {
968      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
969        return i;
970      }
971    }
972    return -1;
973  }
974
975  @Override
976  public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
977    return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
978  }
979
980  @Override
981  public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
982  throws IOException {
983    return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
984  }
985}