001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedAction;
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.client.RegionReplicaUtil;
029import org.apache.hadoop.hbase.master.HMaster;
030import org.apache.hadoop.hbase.regionserver.HRegion;
031import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
032import org.apache.hadoop.hbase.regionserver.HRegionServer;
033import org.apache.hadoop.hbase.regionserver.Region;
034import org.apache.hadoop.hbase.security.User;
035import org.apache.hadoop.hbase.test.MetricsAssertHelper;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.JVMClusterUtil;
038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
040import org.apache.hadoop.hbase.util.Threads;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
049
050/**
051 * This class creates a single process HBase cluster. each server. The master uses the 'default'
052 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem
053 * instance each and will close down their instance on the way out.
054 */
055@InterfaceAudience.Public
056public class MiniHBaseCluster extends HBaseCluster {
057  private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName());
058  public LocalHBaseCluster hbaseCluster;
059  private static int index;
060
061  /**
062   * Start a MiniHBaseCluster.
063   * @param conf             Configuration to be used for cluster
064   * @param numRegionServers initial number of region servers to start. n
065   */
066  public MiniHBaseCluster(Configuration conf, int numRegionServers)
067    throws IOException, InterruptedException {
068    this(conf, 1, numRegionServers);
069  }
070
071  /**
072   * Start a MiniHBaseCluster.
073   * @param conf             Configuration to be used for cluster
074   * @param numMasters       initial number of masters to start.
075   * @param numRegionServers initial number of region servers to start. n
076   */
077  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
078    throws IOException, InterruptedException {
079    this(conf, numMasters, numRegionServers, null, null);
080  }
081
082  /**
083   * Start a MiniHBaseCluster.
084   * @param conf             Configuration to be used for cluster
085   * @param numMasters       initial number of masters to start.
086   * @param numRegionServers initial number of region servers to start.
087   */
088  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
089    Class<? extends HMaster> masterClass,
090    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
091    throws IOException, InterruptedException {
092    this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
093  }
094
095  /**
096   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
097   *                restart where for sure the regionservers come up on same address+port (but just
098   *                with different startcode); by default mini hbase clusters choose new arbitrary
099   *                ports on each cluster start. nn
100   */
101  public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
102    int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
103    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
104    throws IOException, InterruptedException {
105    super(conf);
106
107    // Hadoop 2
108    CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
109
110    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
111      regionserverClass);
112    this.initialClusterStatus = getClusterMetrics();
113  }
114
115  public Configuration getConfiguration() {
116    return this.conf;
117  }
118
119  /**
120   * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance
121   * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem
122   * only, not All filesystems as the FileSystem system exit hook does.
123   */
124  public static class MiniHBaseClusterRegionServer extends HRegionServer {
125    private Thread shutdownThread = null;
126    private User user = null;
127    /**
128     * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
129     * restarted instances of the same server will have different ServerName and will not coincide
130     * with past dead ones. So there's no need to cleanup this list.
131     */
132    static Set<ServerName> killedServers = new HashSet<>();
133
134    public MiniHBaseClusterRegionServer(Configuration conf)
135      throws IOException, InterruptedException {
136      super(conf);
137      this.user = User.getCurrent();
138    }
139
140    /*
141     * n * @param currentfs We return this if we did not make a new one.
142     * @param uniqueName Same name used to help identify the created fs.
143     * @return A new fs instance if we are up on DistributeFileSystem. n
144     */
145
146    @Override
147    protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
148      throws IOException {
149      super.handleReportForDutyResponse(c);
150      // Run this thread to shutdown our filesystem on way out.
151      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
152    }
153
154    @Override
155    public void run() {
156      try {
157        this.user.runAs(new PrivilegedAction<Object>() {
158          @Override
159          public Object run() {
160            runRegionServer();
161            return null;
162          }
163        });
164      } catch (Throwable t) {
165        LOG.error("Exception in run", t);
166      } finally {
167        // Run this on the way out.
168        if (this.shutdownThread != null) {
169          this.shutdownThread.start();
170          Threads.shutdown(this.shutdownThread, 30000);
171        }
172      }
173    }
174
175    private void runRegionServer() {
176      super.run();
177    }
178
179    @Override
180    protected void kill() {
181      killedServers.add(getServerName());
182      super.kill();
183    }
184
185    @Override
186    public void abort(final String reason, final Throwable cause) {
187      this.user.runAs(new PrivilegedAction<Object>() {
188        @Override
189        public Object run() {
190          abortRegionServer(reason, cause);
191          return null;
192        }
193      });
194    }
195
196    private void abortRegionServer(String reason, Throwable cause) {
197      super.abort(reason, cause);
198    }
199  }
200
201  /**
202   * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook
203   * does.
204   */
205  static class SingleFileSystemShutdownThread extends Thread {
206    private final FileSystem fs;
207
208    SingleFileSystemShutdownThread(final FileSystem fs) {
209      super("Shutdown of " + fs);
210      this.fs = fs;
211    }
212
213    @Override
214    public void run() {
215      try {
216        LOG.info("Hook closing fs=" + this.fs);
217        this.fs.close();
218      } catch (NullPointerException npe) {
219        LOG.debug("Need to fix these: " + npe.toString());
220      } catch (IOException e) {
221        LOG.warn("Running hook", e);
222      }
223    }
224  }
225
226  private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
227    final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
228    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
229    throws IOException, InterruptedException {
230    try {
231      if (masterClass == null) {
232        masterClass = HMaster.class;
233      }
234      if (regionserverClass == null) {
235        regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
236      }
237
238      // start up a LocalHBaseCluster
239      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0,
240        masterClass, regionserverClass);
241
242      // manually add the regionservers as other users
243      for (int i = 0; i < nRegionNodes; i++) {
244        Configuration rsConf = HBaseConfiguration.create(conf);
245        if (rsPorts != null) {
246          rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
247        }
248        User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
249        hbaseCluster.addRegionServer(rsConf, i, user);
250      }
251
252      hbaseCluster.startup();
253    } catch (IOException e) {
254      shutdown();
255      throw e;
256    } catch (Throwable t) {
257      LOG.error("Error starting cluster", t);
258      shutdown();
259      throw new IOException("Shutting down", t);
260    }
261  }
262
263  @Override
264  public void startRegionServer(String hostname, int port) throws IOException {
265    final Configuration newConf = HBaseConfiguration.create(conf);
266    newConf.setInt(HConstants.REGIONSERVER_PORT, port);
267    startRegionServer(newConf);
268  }
269
270  @Override
271  public void killRegionServer(ServerName serverName) throws IOException {
272    HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
273    if (server instanceof MiniHBaseClusterRegionServer) {
274      LOG.info("Killing " + server.toString());
275      ((MiniHBaseClusterRegionServer) server).kill();
276    } else {
277      abortRegionServer(getRegionServerIndex(serverName));
278    }
279  }
280
281  @Override
282  public boolean isKilledRS(ServerName serverName) {
283    return MiniHBaseClusterRegionServer.killedServers.contains(serverName);
284  }
285
286  @Override
287  public void stopRegionServer(ServerName serverName) throws IOException {
288    stopRegionServer(getRegionServerIndex(serverName));
289  }
290
291  @Override
292  public void suspendRegionServer(ServerName serverName) throws IOException {
293    suspendRegionServer(getRegionServerIndex(serverName));
294  }
295
296  @Override
297  public void resumeRegionServer(ServerName serverName) throws IOException {
298    resumeRegionServer(getRegionServerIndex(serverName));
299  }
300
301  @Override
302  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
303    // ignore timeout for now
304    waitOnRegionServer(getRegionServerIndex(serverName));
305  }
306
307  @Override
308  public void startZkNode(String hostname, int port) throws IOException {
309    LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
310  }
311
312  @Override
313  public void killZkNode(ServerName serverName) throws IOException {
314    LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
315  }
316
317  @Override
318  public void stopZkNode(ServerName serverName) throws IOException {
319    LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
320  }
321
322  @Override
323  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
324    LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
325  }
326
327  @Override
328  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
329    LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
330  }
331
332  @Override
333  public void startDataNode(ServerName serverName) throws IOException {
334    LOG.warn("Starting datanodes on mini cluster is not supported");
335  }
336
337  @Override
338  public void killDataNode(ServerName serverName) throws IOException {
339    LOG.warn("Aborting datanodes on mini cluster is not supported");
340  }
341
342  @Override
343  public void stopDataNode(ServerName serverName) throws IOException {
344    LOG.warn("Stopping datanodes on mini cluster is not supported");
345  }
346
347  @Override
348  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
349    LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
350  }
351
352  @Override
353  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
354    LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
355  }
356
357  @Override
358  public void startNameNode(ServerName serverName) throws IOException {
359    LOG.warn("Starting namenodes on mini cluster is not supported");
360  }
361
362  @Override
363  public void killNameNode(ServerName serverName) throws IOException {
364    LOG.warn("Aborting namenodes on mini cluster is not supported");
365  }
366
367  @Override
368  public void stopNameNode(ServerName serverName) throws IOException {
369    LOG.warn("Stopping namenodes on mini cluster is not supported");
370  }
371
372  @Override
373  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
374    LOG.warn("Waiting for namenodes to start on mini cluster is not supported");
375  }
376
377  @Override
378  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
379    LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
380  }
381
382  @Override
383  public void startMaster(String hostname, int port) throws IOException {
384    this.startMaster();
385  }
386
387  @Override
388  public void killMaster(ServerName serverName) throws IOException {
389    abortMaster(getMasterIndex(serverName));
390  }
391
392  @Override
393  public void stopMaster(ServerName serverName) throws IOException {
394    stopMaster(getMasterIndex(serverName));
395  }
396
397  @Override
398  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
399    // ignore timeout for now
400    waitOnMaster(getMasterIndex(serverName));
401  }
402
403  /**
404   * Starts a region server thread running n * @return New RegionServerThread
405   */
406  public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException {
407    final Configuration newConf = HBaseConfiguration.create(conf);
408    return startRegionServer(newConf);
409  }
410
411  private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
412    throws IOException {
413    User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++);
414    JVMClusterUtil.RegionServerThread t = null;
415    try {
416      t =
417        hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser);
418      t.start();
419      t.waitForServerOnline();
420    } catch (InterruptedException ie) {
421      throw new IOException("Interrupted adding regionserver to cluster", ie);
422    }
423    return t;
424  }
425
426  /**
427   * Starts a region server thread and waits until its processed by master. Throws an exception when
428   * it can't start a region server or when the region server is not processed by master within the
429   * timeout.
430   * @return New RegionServerThread
431   */
432  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
433    throws IOException {
434
435    JVMClusterUtil.RegionServerThread t = startRegionServer();
436    ServerName rsServerName = t.getRegionServer().getServerName();
437
438    long start = EnvironmentEdgeManager.currentTime();
439    ClusterStatus clusterStatus = getClusterStatus();
440    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
441      if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) {
442        return t;
443      }
444      Threads.sleep(100);
445    }
446    if (t.getRegionServer().isOnline()) {
447      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
448    } else {
449      throw new IOException("RS: " + rsServerName + " is offline");
450    }
451  }
452
453  /**
454   * Cause a region server to exit doing basic clean up only on its way out.
455   * @param serverNumber Used as index into a list.
456   */
457  public String abortRegionServer(int serverNumber) {
458    HRegionServer server = getRegionServer(serverNumber);
459    LOG.info("Aborting " + server.toString());
460    server.abort("Aborting for tests", new Exception("Trace info"));
461    return server.toString();
462  }
463
464  /**
465   * Shut down the specified region server cleanly
466   * @param serverNumber Used as index into a list.
467   * @return the region server that was stopped
468   */
469  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
470    return stopRegionServer(serverNumber, true);
471  }
472
473  /**
474   * Shut down the specified region server cleanly
475   * @param serverNumber Used as index into a list.
476   * @param shutdownFS   True is we are to shutdown the filesystem as part of this regionserver's
477   *                     shutdown. Usually we do but you do not want to do this if you are running
478   *                     multiple regionservers in a test and you shut down one before end of the
479   *                     test.
480   * @return the region server that was stopped
481   */
482  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
483    final boolean shutdownFS) {
484    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
485    LOG.info("Stopping " + server.toString());
486    server.getRegionServer().stop("Stopping rs " + serverNumber);
487    return server;
488  }
489
490  /**
491   * Suspend the specified region server
492   * @param serverNumber Used as index into a list. n
493   */
494  public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) {
495    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
496    LOG.info("Suspending {}", server.toString());
497    server.suspend();
498    return server;
499  }
500
501  /**
502   * Resume the specified region server
503   * @param serverNumber Used as index into a list. n
504   */
505  public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) {
506    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
507    LOG.info("Resuming {}", server.toString());
508    server.resume();
509    return server;
510  }
511
512  /**
513   * Wait for the specified region server to stop. Removes this thread from list of running threads.
514   * n * @return Name of region server that just went down.
515   */
516  public String waitOnRegionServer(final int serverNumber) {
517    return this.hbaseCluster.waitOnRegionServer(serverNumber);
518  }
519
520  /**
521   * Starts a master thread running
522   * @return New RegionServerThread
523   */
524  public JVMClusterUtil.MasterThread startMaster() throws IOException {
525    Configuration c = HBaseConfiguration.create(conf);
526    User user = HBaseTestingUtility.getDifferentUser(c, ".hfs." + index++);
527
528    JVMClusterUtil.MasterThread t = null;
529    try {
530      t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
531      t.start();
532    } catch (InterruptedException ie) {
533      throw new IOException("Interrupted adding master to cluster", ie);
534    }
535    conf.set(HConstants.MASTER_ADDRS_KEY,
536      hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY));
537    return t;
538  }
539
540  /**
541   * Returns the current active master, if available.
542   * @return the active HMaster, null if none is active.
543   */
544  @Override
545  public MasterService.BlockingInterface getMasterAdminService() {
546    return this.hbaseCluster.getActiveMaster().getMasterRpcServices();
547  }
548
549  /**
550   * Returns the current active master, if available.
551   * @return the active HMaster, null if none is active.
552   */
553  public HMaster getMaster() {
554    return this.hbaseCluster.getActiveMaster();
555  }
556
557  /**
558   * Returns the current active master thread, if available.
559   * @return the active MasterThread, null if none is active.
560   */
561  public MasterThread getMasterThread() {
562    for (MasterThread mt : hbaseCluster.getLiveMasters()) {
563      if (mt.getMaster().isActiveMaster()) {
564        return mt;
565      }
566    }
567    return null;
568  }
569
570  /**
571   * Returns the master at the specified index, if available.
572   * @return the active HMaster, null if none is active.
573   */
574  public HMaster getMaster(final int serverNumber) {
575    return this.hbaseCluster.getMaster(serverNumber);
576  }
577
578  /**
579   * Cause a master to exit without shutting down entire cluster.
580   * @param serverNumber Used as index into a list.
581   */
582  public String abortMaster(int serverNumber) {
583    HMaster server = getMaster(serverNumber);
584    LOG.info("Aborting " + server.toString());
585    server.abort("Aborting for tests", new Exception("Trace info"));
586    return server.toString();
587  }
588
589  /**
590   * Shut down the specified master cleanly
591   * @param serverNumber Used as index into a list.
592   * @return the region server that was stopped
593   */
594  public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
595    return stopMaster(serverNumber, true);
596  }
597
598  /**
599   * Shut down the specified master cleanly
600   * @param serverNumber Used as index into a list.
601   * @param shutdownFS   True is we are to shutdown the filesystem as part of this master's
602   *                     shutdown. Usually we do but you do not want to do this if you are running
603   *                     multiple master in a test and you shut down one before end of the test.
604   * @return the master that was stopped
605   */
606  public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) {
607    JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber);
608    LOG.info("Stopping " + server.toString());
609    server.getMaster().stop("Stopping master " + serverNumber);
610    return server;
611  }
612
613  /**
614   * Wait for the specified master to stop. Removes this thread from list of running threads. n
615   * * @return Name of master that just went down.
616   */
617  public String waitOnMaster(final int serverNumber) {
618    return this.hbaseCluster.waitOnMaster(serverNumber);
619  }
620
621  /**
622   * Blocks until there is an active master and that master has completed initialization.
623   * @return true if an active master becomes available. false if there are no masters left. n
624   */
625  @Override
626  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
627    List<JVMClusterUtil.MasterThread> mts;
628    long start = EnvironmentEdgeManager.currentTime();
629    while (
630      !(mts = getMasterThreads()).isEmpty()
631        && (EnvironmentEdgeManager.currentTime() - start) < timeout
632    ) {
633      for (JVMClusterUtil.MasterThread mt : mts) {
634        if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
635          return true;
636        }
637      }
638
639      Threads.sleep(100);
640    }
641    return false;
642  }
643
644  /** Returns List of master threads. */
645  public List<JVMClusterUtil.MasterThread> getMasterThreads() {
646    return this.hbaseCluster.getMasters();
647  }
648
649  /** Returns List of live master threads (skips the aborted and the killed) */
650  public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
651    return this.hbaseCluster.getLiveMasters();
652  }
653
654  /**
655   * Wait for Mini HBase Cluster to shut down.
656   */
657  public void join() {
658    this.hbaseCluster.join();
659  }
660
661  /**
662   * Shut down the mini HBase cluster
663   */
664  @Override
665  public void shutdown() throws IOException {
666    if (this.hbaseCluster != null) {
667      this.hbaseCluster.shutdown();
668    }
669  }
670
671  @Override
672  public void close() throws IOException {
673  }
674
675  /**
676   * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 Use
677   *             {@link #getClusterMetrics()} instead.
678   */
679  @Deprecated
680  public ClusterStatus getClusterStatus() throws IOException {
681    HMaster master = getMaster();
682    return master == null ? null : new ClusterStatus(master.getClusterMetrics());
683  }
684
685  @Override
686  public ClusterMetrics getClusterMetrics() throws IOException {
687    HMaster master = getMaster();
688    return master == null ? null : master.getClusterMetrics();
689  }
690
691  private void executeFlush(HRegion region) throws IOException {
692    if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
693      return;
694    }
695    // retry 5 times if we can not flush
696    for (int i = 0; i < 5; i++) {
697      FlushResult result = region.flush(true);
698      if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) {
699        return;
700      }
701      Threads.sleep(1000);
702    }
703  }
704
705  /**
706   * Call flushCache on all regions on all participating regionservers.
707   */
708  public void flushcache() throws IOException {
709    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
710      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
711        executeFlush(r);
712      }
713    }
714  }
715
716  /**
717   * Call flushCache on all regions of the specified table.
718   */
719  public void flushcache(TableName tableName) throws IOException {
720    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
721      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
722        if (r.getTableDescriptor().getTableName().equals(tableName)) {
723          executeFlush(r);
724        }
725      }
726    }
727  }
728
729  /**
730   * Call flushCache on all regions on all participating regionservers. n
731   */
732  public void compact(boolean major) throws IOException {
733    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
734      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
735        if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
736          r.compact(major);
737        }
738      }
739    }
740  }
741
742  /**
743   * Call flushCache on all regions of the specified table. n
744   */
745  public void compact(TableName tableName, boolean major) throws IOException {
746    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
747      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
748        if (r.getTableDescriptor().getTableName().equals(tableName)) {
749          if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
750            r.compact(major);
751          }
752        }
753      }
754    }
755  }
756
757  /** Returns Number of live region servers in the cluster currently. */
758  public int getNumLiveRegionServers() {
759    return this.hbaseCluster.getLiveRegionServers().size();
760  }
761
762  /**
763   * @return List of region server threads. Does not return the master even though it is also a
764   *         region server.
765   */
766  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
767    return this.hbaseCluster.getRegionServers();
768  }
769
770  /** Returns List of live region server threads (skips the aborted and the killed) */
771  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
772    return this.hbaseCluster.getLiveRegionServers();
773  }
774
775  /**
776   * Grab a numbered region server of your choice. n * @return region server
777   */
778  public HRegionServer getRegionServer(int serverNumber) {
779    return hbaseCluster.getRegionServer(serverNumber);
780  }
781
782  public HRegionServer getRegionServer(ServerName serverName) {
783    return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer())
784      .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null);
785  }
786
787  public List<HRegion> getRegions(byte[] tableName) {
788    return getRegions(TableName.valueOf(tableName));
789  }
790
791  public List<HRegion> getRegions(TableName tableName) {
792    List<HRegion> ret = new ArrayList<>();
793    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
794      HRegionServer hrs = rst.getRegionServer();
795      for (Region region : hrs.getOnlineRegionsLocalContext()) {
796        if (region.getTableDescriptor().getTableName().equals(tableName)) {
797          ret.add((HRegion) region);
798        }
799      }
800    }
801    return ret;
802  }
803
804  /**
805   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying
806   *         regionName. Returns -1 if none found.
807   */
808  public int getServerWithMeta() {
809    return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
810  }
811
812  /**
813   * Get the location of the specified region
814   * @param regionName Name of the region in bytes
815   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying
816   *         hbase:meta. Returns -1 if none found.
817   */
818  public int getServerWith(byte[] regionName) {
819    int index = -1;
820    int count = 0;
821    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
822      HRegionServer hrs = rst.getRegionServer();
823      if (!hrs.isStopped()) {
824        Region region = hrs.getOnlineRegion(regionName);
825        if (region != null) {
826          index = count;
827          break;
828        }
829      }
830      count++;
831    }
832    return index;
833  }
834
835  @Override
836  public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
837    throws IOException {
838    // Assume there is only one master thread which is the active master.
839    // If there are multiple master threads, the backup master threads
840    // should hold some regions. Please refer to #countServedRegions
841    // to see how we find out all regions.
842    HMaster master = getMaster();
843    Region region = master.getOnlineRegion(regionName);
844    if (region != null) {
845      return master.getServerName();
846    }
847    int index = getServerWith(regionName);
848    if (index < 0) {
849      return null;
850    }
851    return getRegionServer(index).getServerName();
852  }
853
854  /**
855   * Counts the total numbers of regions being served by the currently online region servers by
856   * asking each how many regions they have. Does not look at hbase:meta at all. Count includes
857   * catalog tables.
858   * @return number of regions being served by all region servers
859   */
860  public long countServedRegions() {
861    long count = 0;
862    for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
863      count += rst.getRegionServer().getNumberOfOnlineRegions();
864    }
865    for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
866      count += mt.getMaster().getNumberOfOnlineRegions();
867    }
868    return count;
869  }
870
871  /**
872   * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the
873   * mini-cluster back for clean shutdown.
874   */
875  public void killAll() {
876    // Do backups first.
877    MasterThread activeMaster = null;
878    for (MasterThread masterThread : getMasterThreads()) {
879      if (!masterThread.getMaster().isActiveMaster()) {
880        masterThread.getMaster().abort("killAll");
881      } else {
882        activeMaster = masterThread;
883      }
884    }
885    // Do active after.
886    if (activeMaster != null) {
887      activeMaster.getMaster().abort("killAll");
888    }
889    for (RegionServerThread rst : getRegionServerThreads()) {
890      rst.getRegionServer().abort("killAll");
891    }
892  }
893
894  @Override
895  public void waitUntilShutDown() {
896    this.hbaseCluster.join();
897  }
898
899  public List<HRegion> findRegionsForTable(TableName tableName) {
900    ArrayList<HRegion> ret = new ArrayList<>();
901    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
902      HRegionServer hrs = rst.getRegionServer();
903      for (Region region : hrs.getRegions(tableName)) {
904        if (region.getTableDescriptor().getTableName().equals(tableName)) {
905          ret.add((HRegion) region);
906        }
907      }
908    }
909    return ret;
910  }
911
912  protected int getRegionServerIndex(ServerName serverName) {
913    // we have a small number of region servers, this should be fine for now.
914    List<RegionServerThread> servers = getRegionServerThreads();
915    for (int i = 0; i < servers.size(); i++) {
916      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
917        return i;
918      }
919    }
920    return -1;
921  }
922
923  protected int getMasterIndex(ServerName serverName) {
924    List<MasterThread> masters = getMasterThreads();
925    for (int i = 0; i < masters.size(); i++) {
926      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
927        return i;
928      }
929    }
930    return -1;
931  }
932
933  @Override
934  public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
935    return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
936  }
937
938  @Override
939  public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
940    throws IOException {
941    return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
942  }
943}