001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedAction;
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.client.RegionInfoBuilder;
029import org.apache.hadoop.hbase.client.RegionReplicaUtil;
030import org.apache.hadoop.hbase.master.HMaster;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.Region;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.test.MetricsAssertHelper;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.JVMClusterUtil;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
041import org.apache.hadoop.hbase.util.Threads;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
047
048/**
049 * This class creates a single process HBase cluster. each server. The master uses the 'default'
050 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem
051 * instance each and will close down their instance on the way out.
052 * @deprecated since 3.0.0, will be removed in 4.0.0. Use
053 *             {@link org.apache.hadoop.hbase.testing.TestingHBaseCluster} instead.
054 */
055@InterfaceAudience.Public
056@Deprecated
057public class MiniHBaseCluster extends HBaseCluster {
058  private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName());
059  public LocalHBaseCluster hbaseCluster;
060  private static int index;
061
062  /**
063   * Start a MiniHBaseCluster.
064   * @param conf             Configuration to be used for cluster
065   * @param numRegionServers initial number of region servers to start. n
066   */
067  public MiniHBaseCluster(Configuration conf, int numRegionServers)
068    throws IOException, InterruptedException {
069    this(conf, 1, numRegionServers);
070  }
071
072  /**
073   * Start a MiniHBaseCluster.
074   * @param conf             Configuration to be used for cluster
075   * @param numMasters       initial number of masters to start.
076   * @param numRegionServers initial number of region servers to start. n
077   */
078  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
079    throws IOException, InterruptedException {
080    this(conf, numMasters, numRegionServers, null, null);
081  }
082
083  /**
084   * Start a MiniHBaseCluster.
085   * @param conf             Configuration to be used for cluster
086   * @param numMasters       initial number of masters to start.
087   * @param numRegionServers initial number of region servers to start.
088   */
089  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
090    Class<? extends HMaster> masterClass,
091    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
092    throws IOException, InterruptedException {
093    this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
094  }
095
096  /**
097   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
098   *                restart where for sure the regionservers come up on same address+port (but just
099   *                with different startcode); by default mini hbase clusters choose new arbitrary
100   *                ports on each cluster start. nn
101   */
102  public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
103    int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
104    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
105    throws IOException, InterruptedException {
106    super(conf);
107
108    // Hadoop 2
109    CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
110
111    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
112      regionserverClass);
113    this.initialClusterStatus = getClusterMetrics();
114  }
115
116  public Configuration getConfiguration() {
117    return this.conf;
118  }
119
120  /**
121   * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance
122   * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem
123   * only, not All filesystems as the FileSystem system exit hook does.
124   */
125  public static class MiniHBaseClusterRegionServer extends HRegionServer {
126    private Thread shutdownThread = null;
127    private User user = null;
128    /**
129     * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
130     * restarted instances of the same server will have different ServerName and will not coincide
131     * with past dead ones. So there's no need to cleanup this list.
132     */
133    static Set<ServerName> killedServers = new HashSet<>();
134
135    public MiniHBaseClusterRegionServer(Configuration conf)
136      throws IOException, InterruptedException {
137      super(conf);
138      this.user = User.getCurrent();
139    }
140
141    /*
142     * n * @param currentfs We return this if we did not make a new one.
143     * @param uniqueName Same name used to help identify the created fs.
144     * @return A new fs instance if we are up on DistributeFileSystem. n
145     */
146
147    @Override
148    protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
149      throws IOException {
150      super.handleReportForDutyResponse(c);
151      // Run this thread to shutdown our filesystem on way out.
152      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
153    }
154
155    @Override
156    public void run() {
157      try {
158        this.user.runAs(new PrivilegedAction<Object>() {
159          @Override
160          public Object run() {
161            runRegionServer();
162            return null;
163          }
164        });
165      } catch (Throwable t) {
166        LOG.error("Exception in run", t);
167      } finally {
168        // Run this on the way out.
169        if (this.shutdownThread != null) {
170          this.shutdownThread.start();
171          Threads.shutdown(this.shutdownThread, 30000);
172        }
173      }
174    }
175
176    private void runRegionServer() {
177      super.run();
178    }
179
180    @Override
181    protected void kill() {
182      killedServers.add(getServerName());
183      super.kill();
184    }
185
186    @Override
187    public void abort(final String reason, final Throwable cause) {
188      this.user.runAs(new PrivilegedAction<Object>() {
189        @Override
190        public Object run() {
191          abortRegionServer(reason, cause);
192          return null;
193        }
194      });
195    }
196
197    private void abortRegionServer(String reason, Throwable cause) {
198      super.abort(reason, cause);
199    }
200  }
201
202  /**
203   * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook
204   * does.
205   */
206  static class SingleFileSystemShutdownThread extends Thread {
207    private final FileSystem fs;
208
209    SingleFileSystemShutdownThread(final FileSystem fs) {
210      super("Shutdown of " + fs);
211      this.fs = fs;
212    }
213
214    @Override
215    public void run() {
216      try {
217        LOG.info("Hook closing fs=" + this.fs);
218        this.fs.close();
219      } catch (NullPointerException npe) {
220        LOG.debug("Need to fix these: " + npe.toString());
221      } catch (IOException e) {
222        LOG.warn("Running hook", e);
223      }
224    }
225  }
226
227  private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
228    final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
229    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
230    throws IOException, InterruptedException {
231    try {
232      if (masterClass == null) {
233        masterClass = HMaster.class;
234      }
235      if (regionserverClass == null) {
236        regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
237      }
238
239      // start up a LocalHBaseCluster
240      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0,
241        masterClass, regionserverClass);
242
243      // manually add the regionservers as other users
244      for (int i = 0; i < nRegionNodes; i++) {
245        Configuration rsConf = HBaseConfiguration.create(conf);
246        if (rsPorts != null) {
247          rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
248        }
249        User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
250        hbaseCluster.addRegionServer(rsConf, i, user);
251      }
252
253      hbaseCluster.startup();
254    } catch (IOException e) {
255      shutdown();
256      throw e;
257    } catch (Throwable t) {
258      LOG.error("Error starting cluster", t);
259      shutdown();
260      throw new IOException("Shutting down", t);
261    }
262  }
263
264  @Override
265  public void startRegionServer(String hostname, int port) throws IOException {
266    final Configuration newConf = HBaseConfiguration.create(conf);
267    newConf.setInt(HConstants.REGIONSERVER_PORT, port);
268    startRegionServer(newConf);
269  }
270
271  @Override
272  public void killRegionServer(ServerName serverName) throws IOException {
273    HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
274    if (server instanceof MiniHBaseClusterRegionServer) {
275      LOG.info("Killing " + server.toString());
276      ((MiniHBaseClusterRegionServer) server).kill();
277    } else {
278      abortRegionServer(getRegionServerIndex(serverName));
279    }
280  }
281
282  @Override
283  public boolean isKilledRS(ServerName serverName) {
284    return MiniHBaseClusterRegionServer.killedServers.contains(serverName);
285  }
286
287  @Override
288  public void stopRegionServer(ServerName serverName) throws IOException {
289    stopRegionServer(getRegionServerIndex(serverName));
290  }
291
292  @Override
293  public void suspendRegionServer(ServerName serverName) throws IOException {
294    suspendRegionServer(getRegionServerIndex(serverName));
295  }
296
297  @Override
298  public void resumeRegionServer(ServerName serverName) throws IOException {
299    resumeRegionServer(getRegionServerIndex(serverName));
300  }
301
302  @Override
303  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
304    // ignore timeout for now
305    waitOnRegionServer(getRegionServerIndex(serverName));
306  }
307
308  @Override
309  public void startZkNode(String hostname, int port) throws IOException {
310    LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
311  }
312
313  @Override
314  public void killZkNode(ServerName serverName) throws IOException {
315    LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
316  }
317
318  @Override
319  public void stopZkNode(ServerName serverName) throws IOException {
320    LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
321  }
322
323  @Override
324  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
325    LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
326  }
327
328  @Override
329  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
330    LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
331  }
332
333  @Override
334  public void startDataNode(ServerName serverName) throws IOException {
335    LOG.warn("Starting datanodes on mini cluster is not supported");
336  }
337
338  @Override
339  public void killDataNode(ServerName serverName) throws IOException {
340    LOG.warn("Aborting datanodes on mini cluster is not supported");
341  }
342
343  @Override
344  public void stopDataNode(ServerName serverName) throws IOException {
345    LOG.warn("Stopping datanodes on mini cluster is not supported");
346  }
347
348  @Override
349  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
350    LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
351  }
352
353  @Override
354  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
355    LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
356  }
357
358  @Override
359  public void startNameNode(ServerName serverName) throws IOException {
360    LOG.warn("Starting namenodes on mini cluster is not supported");
361  }
362
363  @Override
364  public void killNameNode(ServerName serverName) throws IOException {
365    LOG.warn("Aborting namenodes on mini cluster is not supported");
366  }
367
368  @Override
369  public void stopNameNode(ServerName serverName) throws IOException {
370    LOG.warn("Stopping namenodes on mini cluster is not supported");
371  }
372
373  @Override
374  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
375    LOG.warn("Waiting for namenodes to start on mini cluster is not supported");
376  }
377
378  @Override
379  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
380    LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
381  }
382
383  @Override
384  public void startMaster(String hostname, int port) throws IOException {
385    this.startMaster();
386  }
387
388  @Override
389  public void killMaster(ServerName serverName) throws IOException {
390    abortMaster(getMasterIndex(serverName));
391  }
392
393  @Override
394  public void stopMaster(ServerName serverName) throws IOException {
395    stopMaster(getMasterIndex(serverName));
396  }
397
398  @Override
399  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
400    // ignore timeout for now
401    waitOnMaster(getMasterIndex(serverName));
402  }
403
404  /**
405   * Starts a region server thread running n * @return New RegionServerThread
406   */
407  public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException {
408    final Configuration newConf = HBaseConfiguration.create(conf);
409    return startRegionServer(newConf);
410  }
411
412  private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
413    throws IOException {
414    User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++);
415    JVMClusterUtil.RegionServerThread t = null;
416    try {
417      t =
418        hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser);
419      t.start();
420      t.waitForServerOnline();
421    } catch (InterruptedException ie) {
422      throw new IOException("Interrupted adding regionserver to cluster", ie);
423    }
424    return t;
425  }
426
427  /**
428   * Starts a region server thread and waits until its processed by master. Throws an exception when
429   * it can't start a region server or when the region server is not processed by master within the
430   * timeout.
431   * @return New RegionServerThread
432   */
433  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
434    throws IOException {
435
436    JVMClusterUtil.RegionServerThread t = startRegionServer();
437    ServerName rsServerName = t.getRegionServer().getServerName();
438
439    long start = EnvironmentEdgeManager.currentTime();
440    ClusterMetrics clusterStatus = getClusterMetrics();
441    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
442      if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) {
443        return t;
444      }
445      Threads.sleep(100);
446    }
447    if (t.getRegionServer().isOnline()) {
448      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
449    } else {
450      throw new IOException("RS: " + rsServerName + " is offline");
451    }
452  }
453
454  /**
455   * Cause a region server to exit doing basic clean up only on its way out.
456   * @param serverNumber Used as index into a list.
457   */
458  public String abortRegionServer(int serverNumber) {
459    HRegionServer server = getRegionServer(serverNumber);
460    LOG.info("Aborting " + server.toString());
461    server.abort("Aborting for tests", new Exception("Trace info"));
462    return server.toString();
463  }
464
465  /**
466   * Shut down the specified region server cleanly
467   * @param serverNumber Used as index into a list.
468   * @return the region server that was stopped
469   */
470  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
471    return stopRegionServer(serverNumber, true);
472  }
473
474  /**
475   * Shut down the specified region server cleanly
476   * @param serverNumber Used as index into a list.
477   * @param shutdownFS   True is we are to shutdown the filesystem as part of this regionserver's
478   *                     shutdown. Usually we do but you do not want to do this if you are running
479   *                     multiple regionservers in a test and you shut down one before end of the
480   *                     test.
481   * @return the region server that was stopped
482   */
483  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
484    final boolean shutdownFS) {
485    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
486    LOG.info("Stopping " + server.toString());
487    server.getRegionServer().stop("Stopping rs " + serverNumber);
488    return server;
489  }
490
491  /**
492   * Suspend the specified region server
493   * @param serverNumber Used as index into a list.
494   */
495  public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) {
496    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
497    LOG.info("Suspending {}", server.toString());
498    server.suspend();
499    return server;
500  }
501
502  /**
503   * Resume the specified region server
504   * @param serverNumber Used as index into a list.
505   */
506  public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) {
507    JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
508    LOG.info("Resuming {}", server.toString());
509    server.resume();
510    return server;
511  }
512
513  /**
514   * Wait for the specified region server to stop. Removes this thread from list of running threads.
515   * n * @return Name of region server that just went down.
516   */
517  public String waitOnRegionServer(final int serverNumber) {
518    return this.hbaseCluster.waitOnRegionServer(serverNumber);
519  }
520
521  /**
522   * Starts a master thread running
523   * @return New RegionServerThread
524   */
525  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
526      value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
527      justification = "Testing only, not a big deal")
528  public JVMClusterUtil.MasterThread startMaster() throws IOException {
529    Configuration c = HBaseConfiguration.create(conf);
530    User user = HBaseTestingUtility.getDifferentUser(c, ".hfs." + index++);
531
532    JVMClusterUtil.MasterThread t = null;
533    try {
534      t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
535      t.start();
536    } catch (InterruptedException ie) {
537      throw new IOException("Interrupted adding master to cluster", ie);
538    }
539    conf.set(HConstants.MASTER_ADDRS_KEY,
540      hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY));
541    return t;
542  }
543
544  /**
545   * Returns the current active master, if available.
546   * @return the active HMaster, null if none is active.
547   */
548  public HMaster getMaster() {
549    return this.hbaseCluster.getActiveMaster();
550  }
551
552  /**
553   * Returns the current active master thread, if available.
554   * @return the active MasterThread, null if none is active.
555   */
556  public MasterThread getMasterThread() {
557    for (MasterThread mt : hbaseCluster.getLiveMasters()) {
558      if (mt.getMaster().isActiveMaster()) {
559        return mt;
560      }
561    }
562    return null;
563  }
564
565  /**
566   * Returns the master at the specified index, if available.
567   * @return the active HMaster, null if none is active.
568   */
569  public HMaster getMaster(final int serverNumber) {
570    return this.hbaseCluster.getMaster(serverNumber);
571  }
572
573  /**
574   * Cause a master to exit without shutting down entire cluster.
575   * @param serverNumber Used as index into a list.
576   */
577  public String abortMaster(int serverNumber) {
578    HMaster server = getMaster(serverNumber);
579    LOG.info("Aborting " + server.toString());
580    server.abort("Aborting for tests", new Exception("Trace info"));
581    return server.toString();
582  }
583
584  /**
585   * Shut down the specified master cleanly
586   * @param serverNumber Used as index into a list.
587   * @return the region server that was stopped
588   */
589  public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
590    return stopMaster(serverNumber, true);
591  }
592
593  /**
594   * Shut down the specified master cleanly
595   * @param serverNumber Used as index into a list.
596   * @param shutdownFS   True is we are to shutdown the filesystem as part of this master's
597   *                     shutdown. Usually we do but you do not want to do this if you are running
598   *                     multiple master in a test and you shut down one before end of the test.
599   * @return the master that was stopped
600   */
601  public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) {
602    JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber);
603    LOG.info("Stopping " + server.toString());
604    server.getMaster().stop("Stopping master " + serverNumber);
605    return server;
606  }
607
608  /**
609   * Wait for the specified master to stop. Removes this thread from list of running threads. n
610   * * @return Name of master that just went down.
611   */
612  public String waitOnMaster(final int serverNumber) {
613    return this.hbaseCluster.waitOnMaster(serverNumber);
614  }
615
616  /**
617   * Blocks until there is an active master and that master has completed initialization.
618   * @return true if an active master becomes available. false if there are no masters left. n
619   */
620  @Override
621  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
622    List<JVMClusterUtil.MasterThread> mts;
623    long start = EnvironmentEdgeManager.currentTime();
624    while (
625      !(mts = getMasterThreads()).isEmpty()
626        && (EnvironmentEdgeManager.currentTime() - start) < timeout
627    ) {
628      for (JVMClusterUtil.MasterThread mt : mts) {
629        if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
630          return true;
631        }
632      }
633
634      Threads.sleep(100);
635    }
636    return false;
637  }
638
639  /**
640   * @return List of master threads.
641   */
642  public List<JVMClusterUtil.MasterThread> getMasterThreads() {
643    return this.hbaseCluster.getMasters();
644  }
645
646  /**
647   * @return List of live master threads (skips the aborted and the killed)
648   */
649  public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
650    return this.hbaseCluster.getLiveMasters();
651  }
652
653  /**
654   * Wait for Mini HBase Cluster to shut down.
655   */
656  public void join() {
657    this.hbaseCluster.join();
658  }
659
660  /**
661   * Shut down the mini HBase cluster
662   */
663  @Override
664  public void shutdown() throws IOException {
665    if (this.hbaseCluster != null) {
666      this.hbaseCluster.shutdown();
667    }
668  }
669
670  @Override
671  public void close() throws IOException {
672  }
673
674  @Override
675  public ClusterMetrics getClusterMetrics() throws IOException {
676    HMaster master = getMaster();
677    return master == null ? null : master.getClusterMetrics();
678  }
679
680  private void executeFlush(HRegion region) throws IOException {
681    if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
682      return;
683    }
684    // retry 5 times if we can not flush
685    for (int i = 0; i < 5; i++) {
686      FlushResult result = region.flush(true);
687      if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) {
688        return;
689      }
690      Threads.sleep(1000);
691    }
692  }
693
694  /**
695   * Call flushCache on all regions on all participating regionservers.
696   */
697  public void flushcache() throws IOException {
698    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
699      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
700        executeFlush(r);
701      }
702    }
703  }
704
705  /**
706   * Call flushCache on all regions of the specified table.
707   */
708  public void flushcache(TableName tableName) throws IOException {
709    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
710      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
711        if (r.getTableDescriptor().getTableName().equals(tableName)) {
712          executeFlush(r);
713        }
714      }
715    }
716  }
717
718  /**
719   * Call flushCache on all regions on all participating regionservers. n
720   */
721  public void compact(boolean major) throws IOException {
722    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
723      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
724        if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
725          r.compact(major);
726        }
727      }
728    }
729  }
730
731  /**
732   * Call flushCache on all regions of the specified table. n
733   */
734  public void compact(TableName tableName, boolean major) throws IOException {
735    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
736      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
737        if (r.getTableDescriptor().getTableName().equals(tableName)) {
738          if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
739            r.compact(major);
740          }
741        }
742      }
743    }
744  }
745
746  /**
747   * @return Number of live region servers in the cluster currently.
748   */
749  public int getNumLiveRegionServers() {
750    return this.hbaseCluster.getLiveRegionServers().size();
751  }
752
753  /**
754   * @return List of region server threads. Does not return the master even though it is also a
755   *         region server.
756   */
757  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
758    return this.hbaseCluster.getRegionServers();
759  }
760
761  /**
762   * @return List of live region server threads (skips the aborted and the killed)
763   */
764  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
765    return this.hbaseCluster.getLiveRegionServers();
766  }
767
768  /**
769   * Grab a numbered region server of your choice. n * @return region server
770   */
771  public HRegionServer getRegionServer(int serverNumber) {
772    return hbaseCluster.getRegionServer(serverNumber);
773  }
774
775  public HRegionServer getRegionServer(ServerName serverName) {
776    return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer())
777      .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null);
778  }
779
780  public List<HRegion> getRegions(byte[] tableName) {
781    return getRegions(TableName.valueOf(tableName));
782  }
783
784  public List<HRegion> getRegions(TableName tableName) {
785    List<HRegion> ret = new ArrayList<>();
786    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
787      HRegionServer hrs = rst.getRegionServer();
788      for (Region region : hrs.getOnlineRegionsLocalContext()) {
789        if (region.getTableDescriptor().getTableName().equals(tableName)) {
790          ret.add((HRegion) region);
791        }
792      }
793    }
794    return ret;
795  }
796
797  /**
798   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying
799   *         regionName. Returns -1 if none found.
800   */
801  public int getServerWithMeta() {
802    return getServerWith(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
803  }
804
805  /**
806   * Get the location of the specified region
807   * @param regionName Name of the region in bytes
808   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} of HRS carrying
809   *         hbase:meta. Returns -1 if none found.
810   */
811  public int getServerWith(byte[] regionName) {
812    int index = 0;
813    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
814      HRegionServer hrs = rst.getRegionServer();
815      if (!hrs.isStopped()) {
816        Region region = hrs.getOnlineRegion(regionName);
817        if (region != null) {
818          return index;
819        }
820      }
821      index++;
822    }
823    return -1;
824  }
825
826  @Override
827  public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
828    throws IOException {
829    int index = getServerWith(regionName);
830    if (index < 0) {
831      return null;
832    }
833    return getRegionServer(index).getServerName();
834  }
835
836  /**
837   * Counts the total numbers of regions being served by the currently online region servers by
838   * asking each how many regions they have. Does not look at hbase:meta at all. Count includes
839   * catalog tables.
840   * @return number of regions being served by all region servers
841   */
842  public long countServedRegions() {
843    long count = 0;
844    for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
845      count += rst.getRegionServer().getNumberOfOnlineRegions();
846    }
847    return count;
848  }
849
850  /**
851   * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the
852   * mini-cluster back for clean shutdown.
853   */
854  public void killAll() {
855    // Do backups first.
856    MasterThread activeMaster = null;
857    for (MasterThread masterThread : getMasterThreads()) {
858      if (!masterThread.getMaster().isActiveMaster()) {
859        masterThread.getMaster().abort("killAll");
860      } else {
861        activeMaster = masterThread;
862      }
863    }
864    // Do active after.
865    if (activeMaster != null) {
866      activeMaster.getMaster().abort("killAll");
867    }
868    for (RegionServerThread rst : getRegionServerThreads()) {
869      rst.getRegionServer().abort("killAll");
870    }
871  }
872
873  @Override
874  public void waitUntilShutDown() {
875    this.hbaseCluster.join();
876  }
877
878  public List<HRegion> findRegionsForTable(TableName tableName) {
879    ArrayList<HRegion> ret = new ArrayList<>();
880    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
881      HRegionServer hrs = rst.getRegionServer();
882      for (Region region : hrs.getRegions(tableName)) {
883        if (region.getTableDescriptor().getTableName().equals(tableName)) {
884          ret.add((HRegion) region);
885        }
886      }
887    }
888    return ret;
889  }
890
891  protected int getRegionServerIndex(ServerName serverName) {
892    // we have a small number of region servers, this should be fine for now.
893    List<RegionServerThread> servers = getRegionServerThreads();
894    for (int i = 0; i < servers.size(); i++) {
895      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
896        return i;
897      }
898    }
899    return -1;
900  }
901
902  protected int getMasterIndex(ServerName serverName) {
903    List<MasterThread> masters = getMasterThreads();
904    for (int i = 0; i < masters.size(); i++) {
905      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
906        return i;
907      }
908    }
909    return -1;
910  }
911}