001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedAction;
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.client.RegionInfoBuilder;
029import org.apache.hadoop.hbase.client.RegionReplicaUtil;
030import org.apache.hadoop.hbase.master.HMaster;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.Region;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.test.MetricsAssertHelper;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.JVMClusterUtil;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
041import org.apache.hadoop.hbase.util.Threads;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
047
048/**
049 * This class creates a single process HBase cluster. each server. The master uses the 'default'
050 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem
051 * instance each and will close down their instance on the way out.
052 * @deprecated since 3.0.0, will be removed in 4.0.0. Use
053 *             {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead.
054 */
055@InterfaceAudience.Public
056@Deprecated
057public class MiniHBaseCluster extends HBaseCluster {
058  private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName());
059  public LocalHBaseCluster hbaseCluster;
060  private static int index;
061
062  /**
063   * Start a MiniHBaseCluster.
064   * @param conf             Configuration to be used for cluster
065   * @param numRegionServers initial number of region servers to start.
066   */
067  public MiniHBaseCluster(Configuration conf, int numRegionServers)
068    throws IOException, InterruptedException {
069    this(conf, 1, numRegionServers);
070  }
071
072  /**
073   * Start a MiniHBaseCluster.
074   * @param conf             Configuration to be used for cluster
075   * @param numMasters       initial number of masters to start.
076   * @param numRegionServers initial number of region servers to start.
077   */
078  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
079    throws IOException, InterruptedException {
080    this(conf, numMasters, numRegionServers, null, null);
081  }
082
083  /**
084   * Start a MiniHBaseCluster.
085   * @param conf             Configuration to be used for cluster
086   * @param numMasters       initial number of masters to start.
087   * @param numRegionServers initial number of region servers to start.
088   */
089  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
090    Class<? extends HMaster> masterClass,
091    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
092    throws IOException, InterruptedException {
093    this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
094  }
095
096  /**
097   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
098   *                restart where for sure the regionservers come up on same address+port (but just
099   *                with different startcode); by default mini hbase clusters choose new arbitrary
100   *                ports on each cluster start.
101   */
102  public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
103    int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
104    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
105    throws IOException, InterruptedException {
106    super(conf);
107
108    // Hadoop 2
109    CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
110
111    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
112      regionserverClass);
113    this.initialClusterStatus = getClusterMetrics();
114  }
115
116  public Configuration getConfiguration() {
117    return this.conf;
118  }
119
120  /**
121   * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance
122   * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem
123   * only, not All filesystems as the FileSystem system exit hook does.
124   */
125  public static class MiniHBaseClusterRegionServer extends HRegionServer {
126    private Thread shutdownThread = null;
127    private User user = null;
128    /**
129     * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
130     * restarted instances of the same server will have different ServerName and will not coincide
131     * with past dead ones. So there's no need to cleanup this list.
132     */
133    static Set<ServerName> killedServers = new HashSet<>();
134
135    public MiniHBaseClusterRegionServer(Configuration conf)
136      throws IOException, InterruptedException {
137      super(conf);
138      this.user = User.getCurrent();
139    }
140
141    /*
142     * @param currentfs We return this if we did not make a new one.
143     * @param uniqueName Same name used to help identify the created fs.
144     * @return A new fs instance if we are up on DistributeFileSystem.
145     */
146
147    @Override
148    protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
149      throws IOException {
150      super.handleReportForDutyResponse(c);
151      // Run this thread to shutdown our filesystem on way out.
152      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
153    }
154
155    @Override
156    public void run() {
157      try {
158        this.user.runAs(new PrivilegedAction<Object>() {
159          @Override
160          public Object run() {
161            runRegionServer();
162            return null;
163          }
164        });
165      } catch (Throwable t) {
166        LOG.error("Exception in run", t);
167      } finally {
168        // Run this on the way out.
169        if (this.shutdownThread != null) {
170          this.shutdownThread.start();
171          Threads.shutdown(this.shutdownThread, 30000);
172        }
173      }
174    }
175
176    private void runRegionServer() {
177      super.run();
178    }
179
180    @Override
181    protected void kill() {
182      killedServers.add(getServerName());
183      super.kill();
184    }
185
186    @Override
187    public void abort(final String reason, final Throwable cause) {
188      this.user.runAs(new PrivilegedAction<Object>() {
189        @Override
190        public Object run() {
191          abortRegionServer(reason, cause);
192          return null;
193        }
194      });
195    }
196
197    private void abortRegionServer(String reason, Throwable cause) {
198      super.abort(reason, cause);
199    }
200  }
201
202  /**
203   * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook
204   * does.
205   */
206  static class SingleFileSystemShutdownThread extends Thread {
207    private final FileSystem fs;
208
209    SingleFileSystemShutdownThread(final FileSystem fs) {
210      super("Shutdown of " + fs);
211      this.fs = fs;
212    }
213
214    @Override
215    public void run() {
216      try {
217        LOG.info("Hook closing fs=" + this.fs);
218        this.fs.close();
219      } catch (IOException e) {
220        LOG.warn("Running hook", e);
221      }
222    }
223  }
224
225  private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
226    final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
227    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
228    throws IOException, InterruptedException {
229    try {
230      if (masterClass == null) {
231        masterClass = HMaster.class;
232      }
233      if (regionserverClass == null) {
234        regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
235      }
236
237      // start up a LocalHBaseCluster
238      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0,
239        masterClass, regionserverClass);
240
241      // manually add the regionservers as other users
242      for (int i = 0; i < nRegionNodes; i++) {
243        Configuration rsConf = HBaseConfiguration.create(conf);
244        if (rsPorts != null) {
245          rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
246        }
247        User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
248        hbaseCluster.addRegionServer(rsConf, i, user);
249      }
250
251      hbaseCluster.startup();
252    } catch (IOException e) {
253      shutdown();
254      throw e;
255    } catch (Throwable t) {
256      LOG.error("Error starting cluster", t);
257      shutdown();
258      throw new IOException("Shutting down", t);
259    }
260  }
261
262  @Override
263  public void startRegionServer(String hostname, int port) throws IOException {
264    final Configuration newConf = HBaseConfiguration.create(conf);
265    newConf.setInt(HConstants.REGIONSERVER_PORT, port);
266    startRegionServer(newConf);
267  }
268
269  @Override
270  public void killRegionServer(ServerName serverName) throws IOException {
271    HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
272    if (server instanceof MiniHBaseClusterRegionServer) {
273      LOG.info("Killing " + server.toString());
274      ((MiniHBaseClusterRegionServer) server).kill();
275    } else {
276      abortRegionServer(getRegionServerIndex(serverName));
277    }
278  }
279
280  @Override
281  public boolean isKilledRS(ServerName serverName) {
282    return MiniHBaseClusterRegionServer.killedServers.contains(serverName);
283  }
284
285  @Override
286  public void stopRegionServer(ServerName serverName) throws IOException {
287    stopRegionServer(getRegionServerIndex(serverName));
288  }
289
290  @Override
291  public void suspendRegionServer(ServerName serverName) throws IOException {
292    suspendRegionServer(getRegionServerIndex(serverName));
293  }
294
295  @Override
296  public void resumeRegionServer(ServerName serverName) throws IOException {
297    resumeRegionServer(getRegionServerIndex(serverName));
298  }
299
300  @Override
301  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
302    // ignore timeout for now
303    waitOnRegionServer(getRegionServerIndex(serverName));
304  }
305
306  @Override
307  public void startZkNode(String hostname, int port) throws IOException {
308    LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
309  }
310
311  @Override
312  public void killZkNode(ServerName serverName) throws IOException {
313    LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
314  }
315
316  @Override
317  public void stopZkNode(ServerName serverName) throws IOException {
318    LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
319  }
320
321  @Override
322  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
323    LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
324  }
325
326  @Override
327  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
328    LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
329  }
330
331  @Override
332  public void startDataNode(ServerName serverName) throws IOException {
333    LOG.warn("Starting datanodes on mini cluster is not supported");
334  }
335
336  @Override
337  public void killDataNode(ServerName serverName) throws IOException {
338    LOG.warn("Aborting datanodes on mini cluster is not supported");
339  }
340
341  @Override
342  public void stopDataNode(ServerName serverName) throws IOException {
343    LOG.warn("Stopping datanodes on mini cluster is not supported");
344  }
345
346  @Override
347  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
348    LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
349  }
350
351  @Override
352  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
353    LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
354  }
355
356  @Override
357  public void startNameNode(ServerName serverName) throws IOException {
358    LOG.warn("Starting namenodes on mini cluster is not supported");
359  }
360
361  @Override
362  public void killNameNode(ServerName serverName) throws IOException {
363    LOG.warn("Aborting namenodes on mini cluster is not supported");
364  }
365
366  @Override
367  public void stopNameNode(ServerName serverName) throws IOException {
368    LOG.warn("Stopping namenodes on mini cluster is not supported");
369  }
370
371  @Override
372  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
373    LOG.warn("Waiting for namenodes to start on mini cluster is not supported");
374  }
375
376  @Override
377  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
378    LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
379  }
380
381  @Override
382  public void startMaster(String hostname, int port) throws IOException {
383    this.startMaster();
384  }
385
386  @Override
387  public void killMaster(ServerName serverName) throws IOException {
388    abortMaster(getMasterIndex(serverName));
389  }
390
391  @Override
392  public void stopMaster(ServerName serverName) throws IOException {
393    stopMaster(getMasterIndex(serverName));
394  }
395
396  @Override
397  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
398    // ignore timeout for now
399    waitOnMaster(getMasterIndex(serverName));
400  }
401
402  /**
403   * Starts a region server thread running
404   * @return New RegionServerThread
405   */
406  public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException {
407    final Configuration newConf = HBaseConfiguration.create(conf);
408    return startRegionServer(newConf);
409  }
410
411  private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
412    throws IOException {
413    User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++);
414    JVMClusterUtil.RegionServerThread t = null;
415    try {
416      t =
417        hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser);
418      t.start();
419      t.waitForServerOnline();
420    } catch (InterruptedException ie) {
421      throw new IOException("Interrupted adding regionserver to cluster", ie);
422    }
423    return t;
424  }
425
426  /**
427   * Starts a region server thread and waits until its processed by master. Throws an exception when
428   * it can't start a region server or when the region server is not processed by master within the
429   * timeout.
430   * @return New RegionServerThread
431   */
432  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
433    throws IOException {
434
435    JVMClusterUtil.RegionServerThread t = startRegionServer();
436    ServerName rsServerName = t.getRegionServer().getServerName();
437
438    long start = EnvironmentEdgeManager.currentTime();
439    ClusterMetrics clusterStatus = getClusterMetrics();
440    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
441      if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) {
442        return t;
443      }
444      Threads.sleep(100);
445    }
446    if (t.getRegionServer().isOnline()) {
447      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
448    } else {
449      throw new IOException("RS: " + rsServerName + " is offline");
450    }
451  }
452
453  /**
454   * Cause a region server to exit doing basic clean up only on its way out.
455   * @param serverNumber Used as index into a list.
456   */
457  public String abortRegionServer(int serverNumber) {
458    HRegionServer server = getRegionServer(serverNumber);
459    LOG.info("Aborting " + server.toString());
460    server.abort("Aborting for tests", new Exception("Trace info"));
461    return server.toString();
462  }
463
464  /**
465   * Shut down the specified region server cleanly
466   * @param serverNumber Used as index into a list.
467   * @return the region server that was stopped
468   */
469  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
470    return stopRegionServer(serverNumber, true);
471  }
472
473  /**
474   * Shut down the specified region server cleanly
475   * @param serverNumber Used as index into a list.
476   * @param shutdownFS   True is we are to shutdown the filesystem as part of this regionserver's
477   *                     shutdown. Usually we do but you do not want to do this if you are running
478   *                     multiple regionservers in a test and you shut down one before end of the
479   *                     test.
480   * @return the region server that was stopped
481   */
482  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
483    final boolean shutdownFS) {
484    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
485    LOG.info("Stopping " + server.toString());
486    server.getRegionServer().stop("Stopping rs " + serverNumber);
487    return server;
488  }
489
490  /**
491   * Suspend the specified region server
492   * @param serverNumber Used as index into a list.
493   */
494  public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) {
495    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
496    LOG.info("Suspending {}", server.toString());
497    server.suspend();
498    return server;
499  }
500
501  /**
502   * Resume the specified region server
503   * @param serverNumber Used as index into a list.
504   */
505  public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) {
506    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
507    LOG.info("Resuming {}", server.toString());
508    server.resume();
509    return server;
510  }
511
512  /**
513   * Wait for the specified region server to stop. Removes this thread from list of running threads.
514   * @return Name of region server that just went down.
515   */
516  public String waitOnRegionServer(final int serverNumber) {
517    return this.hbaseCluster.waitOnRegionServer(serverNumber);
518  }
519
520  /**
521   * Starts a master thread running
522   * @return New RegionServerThread
523   */
524  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
525      value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
526      justification = "Testing only, not a big deal")
527  public JVMClusterUtil.MasterThread startMaster() throws IOException {
528    Configuration c = HBaseConfiguration.create(conf);
529    User user = HBaseTestingUtility.getDifferentUser(c, ".hfs." + index++);
530
531    JVMClusterUtil.MasterThread t = null;
532    try {
533      t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
534      t.start();
535    } catch (InterruptedException ie) {
536      throw new IOException("Interrupted adding master to cluster", ie);
537    }
538    conf.set(HConstants.MASTER_ADDRS_KEY,
539      hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY));
540    return t;
541  }
542
543  /**
544   * Returns the current active master, if available.
545   * @return the active HMaster, null if none is active.
546   */
547  public HMaster getMaster() {
548    return this.hbaseCluster.getActiveMaster();
549  }
550
551  /**
552   * Returns the current active master thread, if available.
553   * @return the active MasterThread, null if none is active.
554   */
555  public MasterThread getMasterThread() {
556    for (MasterThread mt : hbaseCluster.getLiveMasters()) {
557      if (mt.getMaster().isActiveMaster()) {
558        return mt;
559      }
560    }
561    return null;
562  }
563
564  /**
565   * Returns the master at the specified index, if available.
566   * @return the active HMaster, null if none is active.
567   */
568  public HMaster getMaster(final int serverNumber) {
569    return this.hbaseCluster.getMaster(serverNumber);
570  }
571
572  /**
573   * Cause a master to exit without shutting down entire cluster.
574   * @param serverNumber Used as index into a list.
575   */
576  public String abortMaster(int serverNumber) {
577    HMaster server = getMaster(serverNumber);
578    LOG.info("Aborting " + server.toString());
579    server.abort("Aborting for tests", new Exception("Trace info"));
580    return server.toString();
581  }
582
583  /**
584   * Shut down the specified master cleanly
585   * @param serverNumber Used as index into a list.
586   * @return the region server that was stopped
587   */
588  public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
589    return stopMaster(serverNumber, true);
590  }
591
592  /**
593   * Shut down the specified master cleanly
594   * @param serverNumber Used as index into a list.
595   * @param shutdownFS   True is we are to shutdown the filesystem as part of this master's
596   *                     shutdown. Usually we do but you do not want to do this if you are running
597   *                     multiple master in a test and you shut down one before end of the test.
598   * @return the master that was stopped
599   */
600  public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) {
601    JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber);
602    LOG.info("Stopping " + server.toString());
603    server.getMaster().stop("Stopping master " + serverNumber);
604    return server;
605  }
606
607  /**
608   * Wait for the specified master to stop. Removes this thread from list of running threads.
609   * @return Name of master that just went down.
610   */
611  public String waitOnMaster(final int serverNumber) {
612    return this.hbaseCluster.waitOnMaster(serverNumber);
613  }
614
615  /**
616   * Blocks until there is an active master and that master has completed initialization.
617   * @return true if an active master becomes available. false if there are no masters left.
618   */
619  @Override
620  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
621    List<JVMClusterUtil.MasterThread> mts;
622    long start = EnvironmentEdgeManager.currentTime();
623    while (
624      !(mts = getMasterThreads()).isEmpty()
625        && (EnvironmentEdgeManager.currentTime() - start) < timeout
626    ) {
627      for (JVMClusterUtil.MasterThread mt : mts) {
628        if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
629          return true;
630        }
631      }
632
633      Threads.sleep(100);
634    }
635    return false;
636  }
637
638  /** Returns List of master threads. */
639  public List<JVMClusterUtil.MasterThread> getMasterThreads() {
640    return this.hbaseCluster.getMasters();
641  }
642
643  /** Returns List of live master threads (skips the aborted and the killed) */
644  public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
645    return this.hbaseCluster.getLiveMasters();
646  }
647
648  /**
649   * Wait for Mini HBase Cluster to shut down.
650   */
651  public void join() {
652    this.hbaseCluster.join();
653  }
654
655  /**
656   * Shut down the mini HBase cluster
657   */
658  @Override
659  public void shutdown() throws IOException {
660    if (this.hbaseCluster != null) {
661      this.hbaseCluster.shutdown();
662    }
663  }
664
665  @Override
666  public void close() throws IOException {
667  }
668
669  @Override
670  public ClusterMetrics getClusterMetrics() throws IOException {
671    HMaster master = getMaster();
672    return master == null ? null : master.getClusterMetrics();
673  }
674
675  private void executeFlush(HRegion region) throws IOException {
676    if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
677      return;
678    }
679    // retry 5 times if we can not flush
680    for (int i = 0; i < 5; i++) {
681      FlushResult result = region.flush(true);
682      if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) {
683        return;
684      }
685      Threads.sleep(1000);
686    }
687  }
688
689  /**
690   * Call flushCache on all regions on all participating regionservers.
691   */
692  public void flushcache() throws IOException {
693    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
694      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
695        executeFlush(r);
696      }
697    }
698  }
699
700  /**
701   * Call flushCache on all regions of the specified table.
702   */
703  public void flushcache(TableName tableName) throws IOException {
704    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
705      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
706        if (r.getTableDescriptor().getTableName().equals(tableName)) {
707          executeFlush(r);
708        }
709      }
710    }
711  }
712
713  /**
714   * Call flushCache on all regions on all participating regionservers.
715   */
716  public void compact(boolean major) throws IOException {
717    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
718      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
719        if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
720          r.compact(major);
721        }
722      }
723    }
724  }
725
726  /**
727   * Call flushCache on all regions of the specified table.
728   */
729  public void compact(TableName tableName, boolean major) throws IOException {
730    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
731      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
732        if (r.getTableDescriptor().getTableName().equals(tableName)) {
733          if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
734            r.compact(major);
735          }
736        }
737      }
738    }
739  }
740
741  /** Returns Number of live region servers in the cluster currently. */
742  public int getNumLiveRegionServers() {
743    return this.hbaseCluster.getLiveRegionServers().size();
744  }
745
746  /**
747   * @return List of region server threads. Does not return the master even though it is also a
748   *         region server.
749   */
750  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
751    return this.hbaseCluster.getRegionServers();
752  }
753
754  /** Returns List of live region server threads (skips the aborted and the killed) */
755  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
756    return this.hbaseCluster.getLiveRegionServers();
757  }
758
759  /**
760   * Grab a numbered region server of your choice.
761   * @return region server
762   */
763  public HRegionServer getRegionServer(int serverNumber) {
764    return hbaseCluster.getRegionServer(serverNumber);
765  }
766
767  public HRegionServer getRegionServer(ServerName serverName) {
768    return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer())
769      .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null);
770  }
771
772  public List<HRegion> getRegions(byte[] tableName) {
773    return getRegions(TableName.valueOf(tableName));
774  }
775
776  public List<HRegion> getRegions(TableName tableName) {
777    List<HRegion> ret = new ArrayList<>();
778    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
779      HRegionServer hrs = rst.getRegionServer();
780      for (Region region : hrs.getOnlineRegionsLocalContext()) {
781        if (region.getTableDescriptor().getTableName().equals(tableName)) {
782          ret.add((HRegion) region);
783        }
784      }
785    }
786    return ret;
787  }
788
789  /**
790   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying
791   *         regionName. Returns -1 if none found.
792   */
793  public int getServerWithMeta() {
794    return getServerWith(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
795  }
796
797  /**
798   * Get the location of the specified region
799   * @param regionName Name of the region in bytes
800   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying
801   *         hbase:meta. Returns -1 if none found.
802   */
803  public int getServerWith(byte[] regionName) {
804    int index = 0;
805    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
806      HRegionServer hrs = rst.getRegionServer();
807      if (!hrs.isStopped()) {
808        Region region = hrs.getOnlineRegion(regionName);
809        if (region != null) {
810          return index;
811        }
812      }
813      index++;
814    }
815    return -1;
816  }
817
818  @Override
819  public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
820    throws IOException {
821    int index = getServerWith(regionName);
822    if (index < 0) {
823      return null;
824    }
825    return getRegionServer(index).getServerName();
826  }
827
828  /**
829   * Counts the total numbers of regions being served by the currently online region servers by
830   * asking each how many regions they have. Does not look at hbase:meta at all. Count includes
831   * catalog tables.
832   * @return number of regions being served by all region servers
833   */
834  public long countServedRegions() {
835    long count = 0;
836    for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
837      count += rst.getRegionServer().getNumberOfOnlineRegions();
838    }
839    return count;
840  }
841
842  /**
843   * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the
844   * mini-cluster back for clean shutdown.
845   */
846  public void killAll() {
847    // Do backups first.
848    MasterThread activeMaster = null;
849    for (MasterThread masterThread : getMasterThreads()) {
850      if (!masterThread.getMaster().isActiveMaster()) {
851        masterThread.getMaster().abort("killAll");
852      } else {
853        activeMaster = masterThread;
854      }
855    }
856    // Do active after.
857    if (activeMaster != null) {
858      activeMaster.getMaster().abort("killAll");
859    }
860    for (RegionServerThread rst : getRegionServerThreads()) {
861      rst.getRegionServer().abort("killAll");
862    }
863  }
864
865  @Override
866  public void waitUntilShutDown() {
867    this.hbaseCluster.join();
868  }
869
870  public List<HRegion> findRegionsForTable(TableName tableName) {
871    ArrayList<HRegion> ret = new ArrayList<>();
872    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
873      HRegionServer hrs = rst.getRegionServer();
874      for (Region region : hrs.getRegions(tableName)) {
875        if (region.getTableDescriptor().getTableName().equals(tableName)) {
876          ret.add((HRegion) region);
877        }
878      }
879    }
880    return ret;
881  }
882
883  protected int getRegionServerIndex(ServerName serverName) {
884    // we have a small number of region servers, this should be fine for now.
885    List<RegionServerThread> servers = getRegionServerThreads();
886    for (int i = 0; i < servers.size(); i++) {
887      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
888        return i;
889      }
890    }
891    return -1;
892  }
893
894  protected int getMasterIndex(ServerName serverName) {
895    List<MasterThread> masters = getMasterThreads();
896    for (int i = 0; i < masters.size(); i++) {
897      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
898        return i;
899      }
900    }
901    return -1;
902  }
903}