001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.security.PrivilegedAction;
023import java.util.ArrayList;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Set;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.hbase.master.HMaster;
030import org.apache.hadoop.hbase.regionserver.HRegion;
031import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
032import org.apache.hadoop.hbase.regionserver.HRegionServer;
033import org.apache.hadoop.hbase.regionserver.Region;
034import org.apache.hadoop.hbase.security.User;
035import org.apache.hadoop.hbase.test.MetricsAssertHelper;
036import org.apache.hadoop.hbase.util.JVMClusterUtil;
037import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
038import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
048
049/**
050 * This class creates a single process HBase cluster.
051 * each server.  The master uses the 'default' FileSystem.  The RegionServers,
052 * if we are running on DistributedFilesystem, create a FileSystem instance
053 * each and will close down their instance on the way out.
054 */
055@InterfaceAudience.Public
056public class MiniHBaseCluster extends HBaseCluster {
057  private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName());
058  public LocalHBaseCluster hbaseCluster;
059  private static int index;
060
061  /**
062   * Start a MiniHBaseCluster.
063   * @param conf Configuration to be used for cluster
064   * @param numRegionServers initial number of region servers to start.
065   * @throws IOException
066   */
067  public MiniHBaseCluster(Configuration conf, int numRegionServers)
068  throws IOException, InterruptedException {
069    this(conf, 1, numRegionServers);
070  }
071
072  /**
073   * Start a MiniHBaseCluster.
074   * @param conf Configuration to be used for cluster
075   * @param numMasters initial number of masters to start.
076   * @param numRegionServers initial number of region servers to start.
077   * @throws IOException
078   */
079  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
080      throws IOException, InterruptedException {
081    this(conf, numMasters, numRegionServers, null, null);
082  }
083
084  /**
085   * Start a MiniHBaseCluster.
086   * @param conf Configuration to be used for cluster
087   * @param numMasters initial number of masters to start.
088   * @param numRegionServers initial number of region servers to start.
089   */
090  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
091         Class<? extends HMaster> masterClass,
092         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
093      throws IOException, InterruptedException {
094    this(conf, numMasters, numRegionServers, null, masterClass, regionserverClass);
095  }
096
097  /**
098   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
099   *   restart where for sure the regionservers come up on same address+port (but
100   *   just with different startcode); by default mini hbase clusters choose new
101   *   arbitrary ports on each cluster start.
102   * @throws IOException
103   * @throws InterruptedException
104   */
105  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
106         List<Integer> rsPorts,
107         Class<? extends HMaster> masterClass,
108         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
109      throws IOException, InterruptedException {
110    super(conf);
111
112    // Hadoop 2
113    CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
114
115    init(numMasters, numRegionServers, rsPorts, masterClass, regionserverClass);
116    this.initialClusterStatus = getClusterStatus();
117  }
118
119  public Configuration getConfiguration() {
120    return this.conf;
121  }
122
123  /**
124   * Subclass so can get at protected methods (none at moment).  Also, creates
125   * a FileSystem instance per instantiation.  Adds a shutdown own FileSystem
126   * on the way out. Shuts down own Filesystem only, not All filesystems as
127   * the FileSystem system exit hook does.
128   */
129  public static class MiniHBaseClusterRegionServer extends HRegionServer {
130    private Thread shutdownThread = null;
131    private User user = null;
132    /**
133     * List of RegionServers killed so far. ServerName also comprises startCode of a server,
134     * so any restarted instances of the same server will have different ServerName and will not
135     * coincide with past dead ones. So there's no need to cleanup this list.
136     */
137    static Set<ServerName> killedServers = new HashSet<>();
138
139    public MiniHBaseClusterRegionServer(Configuration conf)
140        throws IOException, InterruptedException {
141      super(conf);
142      this.user = User.getCurrent();
143    }
144
145    /*
146     * @param c
147     * @param currentfs We return this if we did not make a new one.
148     * @param uniqueName Same name used to help identify the created fs.
149     * @return A new fs instance if we are up on DistributeFileSystem.
150     * @throws IOException
151     */
152
153    @Override
154    protected void handleReportForDutyResponse(
155        final RegionServerStartupResponse c) throws IOException {
156      super.handleReportForDutyResponse(c);
157      // Run this thread to shutdown our filesystem on way out.
158      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
159    }
160
161    @Override
162    public void run() {
163      try {
164        this.user.runAs(new PrivilegedAction<Object>() {
165          @Override
166          public Object run() {
167            runRegionServer();
168            return null;
169          }
170        });
171      } catch (Throwable t) {
172        LOG.error("Exception in run", t);
173      } finally {
174        // Run this on the way out.
175        if (this.shutdownThread != null) {
176          this.shutdownThread.start();
177          Threads.shutdown(this.shutdownThread, 30000);
178        }
179      }
180    }
181
182    private void runRegionServer() {
183      super.run();
184    }
185
186    @Override
187    protected void kill() {
188      killedServers.add(getServerName());
189      super.kill();
190    }
191
192    @Override
193    public void abort(final String reason, final Throwable cause) {
194      this.user.runAs(new PrivilegedAction<Object>() {
195        @Override
196        public Object run() {
197          abortRegionServer(reason, cause);
198          return null;
199        }
200      });
201    }
202
203    private void abortRegionServer(String reason, Throwable cause) {
204      super.abort(reason, cause);
205    }
206  }
207
208  /**
209   * Alternate shutdown hook.
210   * Just shuts down the passed fs, not all as default filesystem hook does.
211   */
212  static class SingleFileSystemShutdownThread extends Thread {
213    private final FileSystem fs;
214    SingleFileSystemShutdownThread(final FileSystem fs) {
215      super("Shutdown of " + fs);
216      this.fs = fs;
217    }
218    @Override
219    public void run() {
220      try {
221        LOG.info("Hook closing fs=" + this.fs);
222        this.fs.close();
223      } catch (NullPointerException npe) {
224        LOG.debug("Need to fix these: " + npe.toString());
225      } catch (IOException e) {
226        LOG.warn("Running hook", e);
227      }
228    }
229  }
230
231  private void init(final int nMasterNodes, final int nRegionNodes, List<Integer> rsPorts,
232                 Class<? extends HMaster> masterClass,
233                 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
234  throws IOException, InterruptedException {
235    try {
236      if (masterClass == null){
237        masterClass =  HMaster.class;
238      }
239      if (regionserverClass == null){
240        regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
241      }
242
243      // start up a LocalHBaseCluster
244      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
245          masterClass, regionserverClass);
246
247      // manually add the regionservers as other users
248      for (int i = 0; i < nRegionNodes; i++) {
249        Configuration rsConf = HBaseConfiguration.create(conf);
250        if (rsPorts != null) {
251          rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
252        }
253        User user = HBaseTestingUtility.getDifferentUser(rsConf,
254            ".hfs."+index++);
255        hbaseCluster.addRegionServer(rsConf, i, user);
256      }
257
258      hbaseCluster.startup();
259    } catch (IOException e) {
260      shutdown();
261      throw e;
262    } catch (Throwable t) {
263      LOG.error("Error starting cluster", t);
264      shutdown();
265      throw new IOException("Shutting down", t);
266    }
267  }
268
269  @Override
270  public void startRegionServer(String hostname, int port) throws IOException {
271    this.startRegionServer();
272  }
273
274  @Override
275  public void killRegionServer(ServerName serverName) throws IOException {
276    HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
277    if (server instanceof MiniHBaseClusterRegionServer) {
278      LOG.info("Killing " + server.toString());
279      ((MiniHBaseClusterRegionServer) server).kill();
280    } else {
281      abortRegionServer(getRegionServerIndex(serverName));
282    }
283  }
284
285  @Override
286  public boolean isKilledRS(ServerName serverName) {
287    return MiniHBaseClusterRegionServer.killedServers.contains(serverName);
288  }
289
290  @Override
291  public void stopRegionServer(ServerName serverName) throws IOException {
292    stopRegionServer(getRegionServerIndex(serverName));
293  }
294
295  @Override
296  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
297    //ignore timeout for now
298    waitOnRegionServer(getRegionServerIndex(serverName));
299  }
300
301  @Override
302  public void startZkNode(String hostname, int port) throws IOException {
303    LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
304  }
305
306  @Override
307  public void killZkNode(ServerName serverName) throws IOException {
308    LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
309  }
310
311  @Override
312  public void stopZkNode(ServerName serverName) throws IOException {
313    LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
314  }
315
316  @Override
317  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
318    LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
319  }
320
321  @Override
322  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
323    LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
324  }
325
326  @Override
327  public void startDataNode(ServerName serverName) throws IOException {
328    LOG.warn("Starting datanodes on mini cluster is not supported");
329  }
330
331  @Override
332  public void killDataNode(ServerName serverName) throws IOException {
333    LOG.warn("Aborting datanodes on mini cluster is not supported");
334  }
335
336  @Override
337  public void stopDataNode(ServerName serverName) throws IOException {
338    LOG.warn("Stopping datanodes on mini cluster is not supported");
339  }
340
341  @Override
342  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
343    LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
344  }
345
346  @Override
347  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
348    LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
349  }
350
351  @Override
352  public void startMaster(String hostname, int port) throws IOException {
353    this.startMaster();
354  }
355
356  @Override
357  public void killMaster(ServerName serverName) throws IOException {
358    abortMaster(getMasterIndex(serverName));
359  }
360
361  @Override
362  public void stopMaster(ServerName serverName) throws IOException {
363    stopMaster(getMasterIndex(serverName));
364  }
365
366  @Override
367  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
368    //ignore timeout for now
369    waitOnMaster(getMasterIndex(serverName));
370  }
371
372  /**
373   * Starts a region server thread running
374   *
375   * @throws IOException
376   * @return New RegionServerThread
377   */
378  public JVMClusterUtil.RegionServerThread startRegionServer()
379      throws IOException {
380    final Configuration newConf = HBaseConfiguration.create(conf);
381    User rsUser =
382        HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
383    JVMClusterUtil.RegionServerThread t =  null;
384    try {
385      t = hbaseCluster.addRegionServer(
386          newConf, hbaseCluster.getRegionServers().size(), rsUser);
387      t.start();
388      t.waitForServerOnline();
389    } catch (InterruptedException ie) {
390      throw new IOException("Interrupted adding regionserver to cluster", ie);
391    }
392    return t;
393  }
394
395  /**
396   * Starts a region server thread and waits until its processed by master. Throws an exception
397   * when it can't start a region server or when the region server is not processed by master
398   * within the timeout.
399   *
400   * @return New RegionServerThread
401   */
402  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
403      throws IOException {
404
405    JVMClusterUtil.RegionServerThread t =  startRegionServer();
406    ServerName rsServerName = t.getRegionServer().getServerName();
407
408    long start = System.currentTimeMillis();
409    ClusterStatus clusterStatus = getClusterStatus();
410    while ((System.currentTimeMillis() - start) < timeout) {
411      if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) {
412        return t;
413      }
414      Threads.sleep(100);
415    }
416    if (t.getRegionServer().isOnline()) {
417      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
418    } else {
419      throw new IOException("RS: " + rsServerName + " is offline");
420    }
421  }
422
423  /**
424   * Cause a region server to exit doing basic clean up only on its way out.
425   * @param serverNumber  Used as index into a list.
426   */
427  public String abortRegionServer(int serverNumber) {
428    HRegionServer server = getRegionServer(serverNumber);
429    LOG.info("Aborting " + server.toString());
430    server.abort("Aborting for tests", new Exception("Trace info"));
431    return server.toString();
432  }
433
434  /**
435   * Shut down the specified region server cleanly
436   *
437   * @param serverNumber  Used as index into a list.
438   * @return the region server that was stopped
439   */
440  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
441    return stopRegionServer(serverNumber, true);
442  }
443
444  /**
445   * Shut down the specified region server cleanly
446   *
447   * @param serverNumber  Used as index into a list.
448   * @param shutdownFS True is we are to shutdown the filesystem as part of this
449   * regionserver's shutdown.  Usually we do but you do not want to do this if
450   * you are running multiple regionservers in a test and you shut down one
451   * before end of the test.
452   * @return the region server that was stopped
453   */
454  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
455      final boolean shutdownFS) {
456    JVMClusterUtil.RegionServerThread server =
457      hbaseCluster.getRegionServers().get(serverNumber);
458    LOG.info("Stopping " + server.toString());
459    server.getRegionServer().stop("Stopping rs " + serverNumber);
460    return server;
461  }
462
463  /**
464   * Wait for the specified region server to stop. Removes this thread from list
465   * of running threads.
466   * @param serverNumber
467   * @return Name of region server that just went down.
468   */
469  public String waitOnRegionServer(final int serverNumber) {
470    return this.hbaseCluster.waitOnRegionServer(serverNumber);
471  }
472
473
474  /**
475   * Starts a master thread running
476   *
477   * @return New RegionServerThread
478   */
479  public JVMClusterUtil.MasterThread startMaster() throws IOException {
480    Configuration c = HBaseConfiguration.create(conf);
481    User user =
482        HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
483
484    JVMClusterUtil.MasterThread t = null;
485    try {
486      t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
487      t.start();
488    } catch (InterruptedException ie) {
489      throw new IOException("Interrupted adding master to cluster", ie);
490    }
491    return t;
492  }
493
494  /**
495   * Returns the current active master, if available.
496   * @return the active HMaster, null if none is active.
497   */
498  @Override
499  public MasterService.BlockingInterface getMasterAdminService() {
500    return this.hbaseCluster.getActiveMaster().getMasterRpcServices();
501  }
502
503  /**
504   * Returns the current active master, if available.
505   * @return the active HMaster, null if none is active.
506   */
507  public HMaster getMaster() {
508    return this.hbaseCluster.getActiveMaster();
509  }
510
511  /**
512   * Returns the current active master thread, if available.
513   * @return the active MasterThread, null if none is active.
514   */
515  public MasterThread getMasterThread() {
516    for (MasterThread mt: hbaseCluster.getLiveMasters()) {
517      if (mt.getMaster().isActiveMaster()) {
518        return mt;
519      }
520    }
521    return null;
522  }
523
524  /**
525   * Returns the master at the specified index, if available.
526   * @return the active HMaster, null if none is active.
527   */
528  public HMaster getMaster(final int serverNumber) {
529    return this.hbaseCluster.getMaster(serverNumber);
530  }
531
532  /**
533   * Cause a master to exit without shutting down entire cluster.
534   * @param serverNumber  Used as index into a list.
535   */
536  public String abortMaster(int serverNumber) {
537    HMaster server = getMaster(serverNumber);
538    LOG.info("Aborting " + server.toString());
539    server.abort("Aborting for tests", new Exception("Trace info"));
540    return server.toString();
541  }
542
543  /**
544   * Shut down the specified master cleanly
545   *
546   * @param serverNumber  Used as index into a list.
547   * @return the region server that was stopped
548   */
549  public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
550    return stopMaster(serverNumber, true);
551  }
552
553  /**
554   * Shut down the specified master cleanly
555   *
556   * @param serverNumber  Used as index into a list.
557   * @param shutdownFS True is we are to shutdown the filesystem as part of this
558   * master's shutdown.  Usually we do but you do not want to do this if
559   * you are running multiple master in a test and you shut down one
560   * before end of the test.
561   * @return the master that was stopped
562   */
563  public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
564      final boolean shutdownFS) {
565    JVMClusterUtil.MasterThread server =
566      hbaseCluster.getMasters().get(serverNumber);
567    LOG.info("Stopping " + server.toString());
568    server.getMaster().stop("Stopping master " + serverNumber);
569    return server;
570  }
571
572  /**
573   * Wait for the specified master to stop. Removes this thread from list
574   * of running threads.
575   * @param serverNumber
576   * @return Name of master that just went down.
577   */
578  public String waitOnMaster(final int serverNumber) {
579    return this.hbaseCluster.waitOnMaster(serverNumber);
580  }
581
582  /**
583   * Blocks until there is an active master and that master has completed
584   * initialization.
585   *
586   * @return true if an active master becomes available.  false if there are no
587   *         masters left.
588   * @throws InterruptedException
589   */
590  @Override
591  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
592    List<JVMClusterUtil.MasterThread> mts;
593    long start = System.currentTimeMillis();
594    while (!(mts = getMasterThreads()).isEmpty()
595        && (System.currentTimeMillis() - start) < timeout) {
596      for (JVMClusterUtil.MasterThread mt : mts) {
597        if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
598          return true;
599        }
600      }
601
602      Threads.sleep(100);
603    }
604    return false;
605  }
606
607  /**
608   * @return List of master threads.
609   */
610  public List<JVMClusterUtil.MasterThread> getMasterThreads() {
611    return this.hbaseCluster.getMasters();
612  }
613
614  /**
615   * @return List of live master threads (skips the aborted and the killed)
616   */
617  public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
618    return this.hbaseCluster.getLiveMasters();
619  }
620
621  /**
622   * Wait for Mini HBase Cluster to shut down.
623   */
624  public void join() {
625    this.hbaseCluster.join();
626  }
627
628  /**
629   * Shut down the mini HBase cluster
630   */
631  @Override
632  public void shutdown() throws IOException {
633    if (this.hbaseCluster != null) {
634      this.hbaseCluster.shutdown();
635    }
636  }
637
638  @Override
639  public void close() throws IOException {
640  }
641
642  /**
643   * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
644   *             Use {@link #getClusterMetrics()} instead.
645   */
646  @Deprecated
647  public ClusterStatus getClusterStatus() throws IOException {
648    HMaster master = getMaster();
649    return master == null ? null : new ClusterStatus(master.getClusterMetrics());
650  }
651
652  @Override
653  public ClusterMetrics getClusterMetrics() throws IOException {
654    HMaster master = getMaster();
655    return master == null ? null : master.getClusterMetrics();
656  }
657
658  private void executeFlush(HRegion region) throws IOException {
659    // retry 5 times if we can not flush
660    for (int i = 0; i < 5; i++) {
661      FlushResult result = region.flush(true);
662      if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) {
663        return;
664      }
665      Threads.sleep(1000);
666    }
667  }
668
669  /**
670   * Call flushCache on all regions on all participating regionservers.
671   */
672  public void flushcache() throws IOException {
673    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
674      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
675        executeFlush(r);
676      }
677    }
678  }
679
680  /**
681   * Call flushCache on all regions of the specified table.
682   */
683  public void flushcache(TableName tableName) throws IOException {
684    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
685      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
686        if (r.getTableDescriptor().getTableName().equals(tableName)) {
687          executeFlush(r);
688        }
689      }
690    }
691  }
692
693  /**
694   * Call flushCache on all regions on all participating regionservers.
695   * @throws IOException
696   */
697  public void compact(boolean major) throws IOException {
698    for (JVMClusterUtil.RegionServerThread t:
699        this.hbaseCluster.getRegionServers()) {
700      for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
701        r.compact(major);
702      }
703    }
704  }
705
706  /**
707   * Call flushCache on all regions of the specified table.
708   * @throws IOException
709   */
710  public void compact(TableName tableName, boolean major) throws IOException {
711    for (JVMClusterUtil.RegionServerThread t:
712        this.hbaseCluster.getRegionServers()) {
713      for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
714        if(r.getTableDescriptor().getTableName().equals(tableName)) {
715          r.compact(major);
716        }
717      }
718    }
719  }
720
721  /**
722   * @return Number of live region servers in the cluster currently.
723   */
724  public int getNumLiveRegionServers() {
725    return this.hbaseCluster.getLiveRegionServers().size();
726  }
727
728  /**
729   * @return List of region server threads. Does not return the master even though it is also
730   * a region server.
731   */
732  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
733    return this.hbaseCluster.getRegionServers();
734  }
735
736  /**
737   * @return List of live region server threads (skips the aborted and the killed)
738   */
739  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
740    return this.hbaseCluster.getLiveRegionServers();
741  }
742
743  /**
744   * Grab a numbered region server of your choice.
745   * @param serverNumber
746   * @return region server
747   */
748  public HRegionServer getRegionServer(int serverNumber) {
749    return hbaseCluster.getRegionServer(serverNumber);
750  }
751
752  public HRegionServer getRegionServer(ServerName serverName) {
753    return hbaseCluster.getRegionServers().stream()
754        .map(t -> t.getRegionServer())
755        .filter(r -> r.getServerName().equals(serverName))
756        .findFirst().orElse(null);
757  }
758
759  public List<HRegion> getRegions(byte[] tableName) {
760    return getRegions(TableName.valueOf(tableName));
761  }
762
763  public List<HRegion> getRegions(TableName tableName) {
764    List<HRegion> ret = new ArrayList<>();
765    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
766      HRegionServer hrs = rst.getRegionServer();
767      for (Region region : hrs.getOnlineRegionsLocalContext()) {
768        if (region.getTableDescriptor().getTableName().equals(tableName)) {
769          ret.add((HRegion)region);
770        }
771      }
772    }
773    return ret;
774  }
775
776  /**
777   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
778   * of HRS carrying regionName. Returns -1 if none found.
779   */
780  public int getServerWithMeta() {
781    return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
782  }
783
784  /**
785   * Get the location of the specified region
786   * @param regionName Name of the region in bytes
787   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
788   * of HRS carrying hbase:meta. Returns -1 if none found.
789   */
790  public int getServerWith(byte[] regionName) {
791    int index = -1;
792    int count = 0;
793    for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
794      HRegionServer hrs = rst.getRegionServer();
795      if (!hrs.isStopped()) {
796        Region region = hrs.getOnlineRegion(regionName);
797        if (region != null) {
798          index = count;
799          break;
800        }
801      }
802      count++;
803    }
804    return index;
805  }
806
807  @Override
808  public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
809  throws IOException {
810    // Assume there is only one master thread which is the active master.
811    // If there are multiple master threads, the backup master threads
812    // should hold some regions. Please refer to #countServedRegions
813    // to see how we find out all regions.
814    HMaster master = getMaster();
815    Region region = master.getOnlineRegion(regionName);
816    if (region != null) {
817      return master.getServerName();
818    }
819    int index = getServerWith(regionName);
820    if (index < 0) {
821      return null;
822    }
823    return getRegionServer(index).getServerName();
824  }
825
826  /**
827   * Counts the total numbers of regions being served by the currently online
828   * region servers by asking each how many regions they have.  Does not look
829   * at hbase:meta at all.  Count includes catalog tables.
830   * @return number of regions being served by all region servers
831   */
832  public long countServedRegions() {
833    long count = 0;
834    for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
835      count += rst.getRegionServer().getNumberOfOnlineRegions();
836    }
837    for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
838      count += mt.getMaster().getNumberOfOnlineRegions();
839    }
840    return count;
841  }
842
843  /**
844   * Do a simulated kill all masters and regionservers. Useful when it is
845   * impossible to bring the mini-cluster back for clean shutdown.
846   */
847  public void killAll() {
848    // Do backups first.
849    MasterThread activeMaster = null;
850    for (MasterThread masterThread : getMasterThreads()) {
851      if (!masterThread.getMaster().isActiveMaster()) {
852        masterThread.getMaster().abort("killAll");
853      } else {
854        activeMaster = masterThread;
855      }
856    }
857    // Do active after.
858    if (activeMaster != null) {
859      activeMaster.getMaster().abort("killAll");
860    }
861    for (RegionServerThread rst : getRegionServerThreads()) {
862      rst.getRegionServer().abort("killAll");
863    }
864  }
865
866  @Override
867  public void waitUntilShutDown() {
868    this.hbaseCluster.join();
869  }
870
871  public List<HRegion> findRegionsForTable(TableName tableName) {
872    ArrayList<HRegion> ret = new ArrayList<>();
873    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
874      HRegionServer hrs = rst.getRegionServer();
875      for (Region region : hrs.getRegions(tableName)) {
876        if (region.getTableDescriptor().getTableName().equals(tableName)) {
877          ret.add((HRegion)region);
878        }
879      }
880    }
881    return ret;
882  }
883
884
885  protected int getRegionServerIndex(ServerName serverName) {
886    //we have a small number of region servers, this should be fine for now.
887    List<RegionServerThread> servers = getRegionServerThreads();
888    for (int i=0; i < servers.size(); i++) {
889      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
890        return i;
891      }
892    }
893    return -1;
894  }
895
896  protected int getMasterIndex(ServerName serverName) {
897    List<MasterThread> masters = getMasterThreads();
898    for (int i = 0; i < masters.size(); i++) {
899      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
900        return i;
901      }
902    }
903    return -1;
904  }
905
906  @Override
907  public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
908    return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
909  }
910
911  @Override
912  public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
913  throws IOException {
914    return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
915  }
916}