001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.security.PrivilegedAction;
023import java.util.ArrayList;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Set;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.hbase.master.HMaster;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.Region;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.test.MetricsAssertHelper;
037import org.apache.hadoop.hbase.util.JVMClusterUtil;
038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
040import org.apache.hadoop.hbase.util.Threads;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
049
050/**
051 * This class creates a single process HBase cluster.
052 * each server.  The master uses the 'default' FileSystem.  The RegionServers,
053 * if we are running on DistributedFilesystem, create a FileSystem instance
054 * each and will close down their instance on the way out.
055 */
056@InterfaceAudience.Public
057public class MiniHBaseCluster extends HBaseCluster {
058  private static final Logger LOG = LoggerFactory.getLogger(MiniHBaseCluster.class.getName());
059  public LocalHBaseCluster hbaseCluster;
060  private static int index;
061
062  /**
063   * Start a MiniHBaseCluster.
064   * @param conf Configuration to be used for cluster
065   * @param numRegionServers initial number of region servers to start.
066   * @throws IOException
067   */
068  public MiniHBaseCluster(Configuration conf, int numRegionServers)
069  throws IOException, InterruptedException {
070    this(conf, 1, numRegionServers);
071  }
072
073  /**
074   * Start a MiniHBaseCluster.
075   * @param conf Configuration to be used for cluster
076   * @param numMasters initial number of masters to start.
077   * @param numRegionServers initial number of region servers to start.
078   * @throws IOException
079   */
080  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
081      throws IOException, InterruptedException {
082    this(conf, numMasters, numRegionServers, null, null);
083  }
084
085  /**
086   * Start a MiniHBaseCluster.
087   * @param conf Configuration to be used for cluster
088   * @param numMasters initial number of masters to start.
089   * @param numRegionServers initial number of region servers to start.
090   */
091  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
092         Class<? extends HMaster> masterClass,
093         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
094      throws IOException, InterruptedException {
095    this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
096  }
097
098  /**
099   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
100   *   restart where for sure the regionservers come up on same address+port (but
101   *   just with different startcode); by default mini hbase clusters choose new
102   *   arbitrary ports on each cluster start.
103   * @throws IOException
104   * @throws InterruptedException
105   */
106  public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
107         int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
108         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
109      throws IOException, InterruptedException {
110    super(conf);
111
112    // Hadoop 2
113    CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
114
115    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
116        regionserverClass);
117    this.initialClusterStatus = getClusterMetrics();
118  }
119
120  public Configuration getConfiguration() {
121    return this.conf;
122  }
123
124  /**
125   * Subclass so can get at protected methods (none at moment).  Also, creates
126   * a FileSystem instance per instantiation.  Adds a shutdown own FileSystem
127   * on the way out. Shuts down own Filesystem only, not All filesystems as
128   * the FileSystem system exit hook does.
129   */
130  public static class MiniHBaseClusterRegionServer extends HRegionServer {
131    private Thread shutdownThread = null;
132    private User user = null;
133    /**
134     * List of RegionServers killed so far. ServerName also comprises startCode of a server,
135     * so any restarted instances of the same server will have different ServerName and will not
136     * coincide with past dead ones. So there's no need to cleanup this list.
137     */
138    static Set<ServerName> killedServers = new HashSet<>();
139
140    public MiniHBaseClusterRegionServer(Configuration conf)
141        throws IOException, InterruptedException {
142      super(conf);
143      this.user = User.getCurrent();
144    }
145
146    /*
147     * @param c
148     * @param currentfs We return this if we did not make a new one.
149     * @param uniqueName Same name used to help identify the created fs.
150     * @return A new fs instance if we are up on DistributeFileSystem.
151     * @throws IOException
152     */
153
154    @Override
155    protected void handleReportForDutyResponse(
156        final RegionServerStartupResponse c) throws IOException {
157      super.handleReportForDutyResponse(c);
158      // Run this thread to shutdown our filesystem on way out.
159      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
160    }
161
162    @Override
163    public void run() {
164      try {
165        this.user.runAs(new PrivilegedAction<Object>() {
166          @Override
167          public Object run() {
168            runRegionServer();
169            return null;
170          }
171        });
172      } catch (Throwable t) {
173        LOG.error("Exception in run", t);
174      } finally {
175        // Run this on the way out.
176        if (this.shutdownThread != null) {
177          this.shutdownThread.start();
178          Threads.shutdown(this.shutdownThread, 30000);
179        }
180      }
181    }
182
183    private void runRegionServer() {
184      super.run();
185    }
186
187    @Override
188    protected void kill() {
189      killedServers.add(getServerName());
190      super.kill();
191    }
192
193    @Override
194    public void abort(final String reason, final Throwable cause) {
195      this.user.runAs(new PrivilegedAction<Object>() {
196        @Override
197        public Object run() {
198          abortRegionServer(reason, cause);
199          return null;
200        }
201      });
202    }
203
204    private void abortRegionServer(String reason, Throwable cause) {
205      super.abort(reason, cause);
206    }
207  }
208
209  /**
210   * Alternate shutdown hook.
211   * Just shuts down the passed fs, not all as default filesystem hook does.
212   */
213  static class SingleFileSystemShutdownThread extends Thread {
214    private final FileSystem fs;
215    SingleFileSystemShutdownThread(final FileSystem fs) {
216      super("Shutdown of " + fs);
217      this.fs = fs;
218    }
219    @Override
220    public void run() {
221      try {
222        LOG.info("Hook closing fs=" + this.fs);
223        this.fs.close();
224      } catch (NullPointerException npe) {
225        LOG.debug("Need to fix these: " + npe.toString());
226      } catch (IOException e) {
227        LOG.warn("Running hook", e);
228      }
229    }
230  }
231
232  private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
233      final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
234      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
235  throws IOException, InterruptedException {
236    try {
237      if (masterClass == null){
238        masterClass =  HMaster.class;
239      }
240      if (regionserverClass == null){
241        regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
242      }
243
244      // start up a LocalHBaseCluster
245      hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0,
246          masterClass, regionserverClass);
247
248      // manually add the regionservers as other users
249      for (int i = 0; i < nRegionNodes; i++) {
250        Configuration rsConf = HBaseConfiguration.create(conf);
251        if (rsPorts != null) {
252          rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
253        }
254        User user = HBaseTestingUtility.getDifferentUser(rsConf,
255            ".hfs."+index++);
256        hbaseCluster.addRegionServer(rsConf, i, user);
257      }
258
259      hbaseCluster.startup();
260    } catch (IOException e) {
261      shutdown();
262      throw e;
263    } catch (Throwable t) {
264      LOG.error("Error starting cluster", t);
265      shutdown();
266      throw new IOException("Shutting down", t);
267    }
268  }
269
270  @Override
271  public void startRegionServer(String hostname, int port) throws IOException {
272    final Configuration newConf = HBaseConfiguration.create(conf);
273    newConf.setInt(HConstants.REGIONSERVER_PORT, port);
274    startRegionServer(newConf);
275  }
276
277  @Override
278  public void killRegionServer(ServerName serverName) throws IOException {
279    HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
280    if (server instanceof MiniHBaseClusterRegionServer) {
281      LOG.info("Killing " + server.toString());
282      ((MiniHBaseClusterRegionServer) server).kill();
283    } else {
284      abortRegionServer(getRegionServerIndex(serverName));
285    }
286  }
287
288  @Override
289  public boolean isKilledRS(ServerName serverName) {
290    return MiniHBaseClusterRegionServer.killedServers.contains(serverName);
291  }
292
293  @Override
294  public void stopRegionServer(ServerName serverName) throws IOException {
295    stopRegionServer(getRegionServerIndex(serverName));
296  }
297
298  @Override
299  public void suspendRegionServer(ServerName serverName) throws IOException {
300    suspendRegionServer(getRegionServerIndex(serverName));
301  }
302
303  @Override
304  public void resumeRegionServer(ServerName serverName) throws IOException {
305    resumeRegionServer(getRegionServerIndex(serverName));
306  }
307
308  @Override
309  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
310    //ignore timeout for now
311    waitOnRegionServer(getRegionServerIndex(serverName));
312  }
313
314  @Override
315  public void startZkNode(String hostname, int port) throws IOException {
316    LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
317  }
318
319  @Override
320  public void killZkNode(ServerName serverName) throws IOException {
321    LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
322  }
323
324  @Override
325  public void stopZkNode(ServerName serverName) throws IOException {
326    LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
327  }
328
329  @Override
330  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
331    LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
332  }
333
334  @Override
335  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
336    LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
337  }
338
339  @Override
340  public void startDataNode(ServerName serverName) throws IOException {
341    LOG.warn("Starting datanodes on mini cluster is not supported");
342  }
343
344  @Override
345  public void killDataNode(ServerName serverName) throws IOException {
346    LOG.warn("Aborting datanodes on mini cluster is not supported");
347  }
348
349  @Override
350  public void stopDataNode(ServerName serverName) throws IOException {
351    LOG.warn("Stopping datanodes on mini cluster is not supported");
352  }
353
354  @Override
355  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
356    LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
357  }
358
359  @Override
360  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
361    LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
362  }
363
364  @Override
365  public void startNameNode(ServerName serverName) throws IOException {
366    LOG.warn("Starting namenodes on mini cluster is not supported");
367  }
368
369  @Override
370  public void killNameNode(ServerName serverName) throws IOException {
371    LOG.warn("Aborting namenodes on mini cluster is not supported");
372  }
373
374  @Override
375  public void stopNameNode(ServerName serverName) throws IOException {
376    LOG.warn("Stopping namenodes on mini cluster is not supported");
377  }
378
379  @Override
380  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
381    LOG.warn("Waiting for namenodes to start on mini cluster is not supported");
382  }
383
384  @Override
385  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
386    LOG.warn("Waiting for namenodes to stop on mini cluster is not supported");
387  }
388
389  @Override
390  public void startMaster(String hostname, int port) throws IOException {
391    this.startMaster();
392  }
393
394  @Override
395  public void killMaster(ServerName serverName) throws IOException {
396    abortMaster(getMasterIndex(serverName));
397  }
398
399  @Override
400  public void stopMaster(ServerName serverName) throws IOException {
401    stopMaster(getMasterIndex(serverName));
402  }
403
404  @Override
405  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
406    //ignore timeout for now
407    waitOnMaster(getMasterIndex(serverName));
408  }
409
410  /**
411   * Starts a region server thread running
412   *
413   * @throws IOException
414   * @return New RegionServerThread
415   */
416  public JVMClusterUtil.RegionServerThread startRegionServer()
417      throws IOException {
418    final Configuration newConf = HBaseConfiguration.create(conf);
419    return startRegionServer(newConf);
420  }
421
422  private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
423      throws IOException {
424    User rsUser =
425        HBaseTestingUtility.getDifferentUser(configuration, ".hfs."+index++);
426    JVMClusterUtil.RegionServerThread t =  null;
427    try {
428      t = hbaseCluster.addRegionServer(
429          configuration, hbaseCluster.getRegionServers().size(), rsUser);
430      t.start();
431      t.waitForServerOnline();
432    } catch (InterruptedException ie) {
433      throw new IOException("Interrupted adding regionserver to cluster", ie);
434    }
435    return t;
436  }
437
438  /**
439   * Starts a region server thread and waits until its processed by master. Throws an exception
440   * when it can't start a region server or when the region server is not processed by master
441   * within the timeout.
442   *
443   * @return New RegionServerThread
444   */
445  public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout)
446      throws IOException {
447
448    JVMClusterUtil.RegionServerThread t =  startRegionServer();
449    ServerName rsServerName = t.getRegionServer().getServerName();
450
451    long start = System.currentTimeMillis();
452    ClusterStatus clusterStatus = getClusterStatus();
453    while ((System.currentTimeMillis() - start) < timeout) {
454      if (clusterStatus != null && clusterStatus.getServers().contains(rsServerName)) {
455        return t;
456      }
457      Threads.sleep(100);
458    }
459    if (t.getRegionServer().isOnline()) {
460      throw new IOException("RS: " + rsServerName + " online, but not processed by master");
461    } else {
462      throw new IOException("RS: " + rsServerName + " is offline");
463    }
464  }
465
466  /**
467   * Cause a region server to exit doing basic clean up only on its way out.
468   * @param serverNumber  Used as index into a list.
469   */
470  public String abortRegionServer(int serverNumber) {
471    HRegionServer server = getRegionServer(serverNumber);
472    LOG.info("Aborting " + server.toString());
473    server.abort("Aborting for tests", new Exception("Trace info"));
474    return server.toString();
475  }
476
477  /**
478   * Shut down the specified region server cleanly
479   *
480   * @param serverNumber  Used as index into a list.
481   * @return the region server that was stopped
482   */
483  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
484    return stopRegionServer(serverNumber, true);
485  }
486
487  /**
488   * Shut down the specified region server cleanly
489   *
490   * @param serverNumber  Used as index into a list.
491   * @param shutdownFS True is we are to shutdown the filesystem as part of this
492   * regionserver's shutdown.  Usually we do but you do not want to do this if
493   * you are running multiple regionservers in a test and you shut down one
494   * before end of the test.
495   * @return the region server that was stopped
496   */
497  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
498      final boolean shutdownFS) {
499    JVMClusterUtil.RegionServerThread server =
500      hbaseCluster.getRegionServers().get(serverNumber);
501    LOG.info("Stopping " + server.toString());
502    server.getRegionServer().stop("Stopping rs " + serverNumber);
503    return server;
504  }
505
506  /**
507   * Suspend the specified region server
508   * @param serverNumber Used as index into a list.
509   * @return
510   */
511  public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) {
512    JVMClusterUtil.RegionServerThread server =
513        hbaseCluster.getRegionServers().get(serverNumber);
514    LOG.info("Suspending {}", server.toString());
515    server.suspend();
516    return server;
517  }
518
519  /**
520   * Resume the specified region server
521   * @param serverNumber Used as index into a list.
522   * @return
523   */
524  public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) {
525    JVMClusterUtil.RegionServerThread server =
526        hbaseCluster.getRegionServers().get(serverNumber);
527    LOG.info("Resuming {}", server.toString());
528    server.resume();
529    return server;
530  }
531
532  /**
533   * Wait for the specified region server to stop. Removes this thread from list
534   * of running threads.
535   * @param serverNumber
536   * @return Name of region server that just went down.
537   */
538  public String waitOnRegionServer(final int serverNumber) {
539    return this.hbaseCluster.waitOnRegionServer(serverNumber);
540  }
541
542
543  /**
544   * Starts a master thread running
545   *
546   * @return New RegionServerThread
547   */
548  public JVMClusterUtil.MasterThread startMaster() throws IOException {
549    Configuration c = HBaseConfiguration.create(conf);
550    User user =
551        HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
552
553    JVMClusterUtil.MasterThread t = null;
554    try {
555      t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
556      t.start();
557    } catch (InterruptedException ie) {
558      throw new IOException("Interrupted adding master to cluster", ie);
559    }
560    conf.set(HConstants.MASTER_ADDRS_KEY,
561        hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY));
562    return t;
563  }
564
565  /**
566   * Returns the current active master, if available.
567   * @return the active HMaster, null if none is active.
568   */
569  @Override
570  public MasterService.BlockingInterface getMasterAdminService() {
571    return this.hbaseCluster.getActiveMaster().getMasterRpcServices();
572  }
573
574  /**
575   * Returns the current active master, if available.
576   * @return the active HMaster, null if none is active.
577   */
578  public HMaster getMaster() {
579    return this.hbaseCluster.getActiveMaster();
580  }
581
582  /**
583   * Returns the current active master thread, if available.
584   * @return the active MasterThread, null if none is active.
585   */
586  public MasterThread getMasterThread() {
587    for (MasterThread mt: hbaseCluster.getLiveMasters()) {
588      if (mt.getMaster().isActiveMaster()) {
589        return mt;
590      }
591    }
592    return null;
593  }
594
595  /**
596   * Returns the master at the specified index, if available.
597   * @return the active HMaster, null if none is active.
598   */
599  public HMaster getMaster(final int serverNumber) {
600    return this.hbaseCluster.getMaster(serverNumber);
601  }
602
603  /**
604   * Cause a master to exit without shutting down entire cluster.
605   * @param serverNumber  Used as index into a list.
606   */
607  public String abortMaster(int serverNumber) {
608    HMaster server = getMaster(serverNumber);
609    LOG.info("Aborting " + server.toString());
610    server.abort("Aborting for tests", new Exception("Trace info"));
611    return server.toString();
612  }
613
614  /**
615   * Shut down the specified master cleanly
616   *
617   * @param serverNumber  Used as index into a list.
618   * @return the region server that was stopped
619   */
620  public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
621    return stopMaster(serverNumber, true);
622  }
623
624  /**
625   * Shut down the specified master cleanly
626   *
627   * @param serverNumber  Used as index into a list.
628   * @param shutdownFS True is we are to shutdown the filesystem as part of this
629   * master's shutdown.  Usually we do but you do not want to do this if
630   * you are running multiple master in a test and you shut down one
631   * before end of the test.
632   * @return the master that was stopped
633   */
634  public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
635      final boolean shutdownFS) {
636    JVMClusterUtil.MasterThread server =
637      hbaseCluster.getMasters().get(serverNumber);
638    LOG.info("Stopping " + server.toString());
639    server.getMaster().stop("Stopping master " + serverNumber);
640    return server;
641  }
642
643  /**
644   * Wait for the specified master to stop. Removes this thread from list
645   * of running threads.
646   * @param serverNumber
647   * @return Name of master that just went down.
648   */
649  public String waitOnMaster(final int serverNumber) {
650    return this.hbaseCluster.waitOnMaster(serverNumber);
651  }
652
653  /**
654   * Blocks until there is an active master and that master has completed
655   * initialization.
656   *
657   * @return true if an active master becomes available.  false if there are no
658   *         masters left.
659   * @throws InterruptedException
660   */
661  @Override
662  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
663    List<JVMClusterUtil.MasterThread> mts;
664    long start = System.currentTimeMillis();
665    while (!(mts = getMasterThreads()).isEmpty()
666        && (System.currentTimeMillis() - start) < timeout) {
667      for (JVMClusterUtil.MasterThread mt : mts) {
668        if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
669          return true;
670        }
671      }
672
673      Threads.sleep(100);
674    }
675    return false;
676  }
677
678  /**
679   * @return List of master threads.
680   */
681  public List<JVMClusterUtil.MasterThread> getMasterThreads() {
682    return this.hbaseCluster.getMasters();
683  }
684
685  /**
686   * @return List of live master threads (skips the aborted and the killed)
687   */
688  public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
689    return this.hbaseCluster.getLiveMasters();
690  }
691
692  /**
693   * Wait for Mini HBase Cluster to shut down.
694   */
695  public void join() {
696    this.hbaseCluster.join();
697  }
698
699  /**
700   * Shut down the mini HBase cluster
701   */
702  @Override
703  public void shutdown() throws IOException {
704    if (this.hbaseCluster != null) {
705      this.hbaseCluster.shutdown();
706    }
707  }
708
709  @Override
710  public void close() throws IOException {
711  }
712
713  /**
714   * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
715   *             Use {@link #getClusterMetrics()} instead.
716   */
717  @Deprecated
718  public ClusterStatus getClusterStatus() throws IOException {
719    HMaster master = getMaster();
720    return master == null ? null : new ClusterStatus(master.getClusterMetrics());
721  }
722
723  @Override
724  public ClusterMetrics getClusterMetrics() throws IOException {
725    HMaster master = getMaster();
726    return master == null ? null : master.getClusterMetrics();
727  }
728
729  private void executeFlush(HRegion region) throws IOException {
730    // retry 5 times if we can not flush
731    for (int i = 0; i < 5; i++) {
732      FlushResult result = region.flush(true);
733      if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) {
734        return;
735      }
736      Threads.sleep(1000);
737    }
738  }
739
740  /**
741   * Call flushCache on all regions on all participating regionservers.
742   */
743  public void flushcache() throws IOException {
744    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
745      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
746        executeFlush(r);
747      }
748    }
749  }
750
751  /**
752   * Call flushCache on all regions of the specified table.
753   */
754  public void flushcache(TableName tableName) throws IOException {
755    for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) {
756      for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) {
757        if (r.getTableDescriptor().getTableName().equals(tableName)) {
758          executeFlush(r);
759        }
760      }
761    }
762  }
763
764  /**
765   * Call flushCache on all regions on all participating regionservers.
766   * @throws IOException
767   */
768  public void compact(boolean major) throws IOException {
769    for (JVMClusterUtil.RegionServerThread t:
770        this.hbaseCluster.getRegionServers()) {
771      for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
772        r.compact(major);
773      }
774    }
775  }
776
777  /**
778   * Call flushCache on all regions of the specified table.
779   * @throws IOException
780   */
781  public void compact(TableName tableName, boolean major) throws IOException {
782    for (JVMClusterUtil.RegionServerThread t:
783        this.hbaseCluster.getRegionServers()) {
784      for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
785        if(r.getTableDescriptor().getTableName().equals(tableName)) {
786          r.compact(major);
787        }
788      }
789    }
790  }
791
792  /**
793   * @return Number of live region servers in the cluster currently.
794   */
795  public int getNumLiveRegionServers() {
796    return this.hbaseCluster.getLiveRegionServers().size();
797  }
798
799  /**
800   * @return List of region server threads. Does not return the master even though it is also
801   * a region server.
802   */
803  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
804    return this.hbaseCluster.getRegionServers();
805  }
806
807  /**
808   * @return List of live region server threads (skips the aborted and the killed)
809   */
810  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
811    return this.hbaseCluster.getLiveRegionServers();
812  }
813
814  /**
815   * Grab a numbered region server of your choice.
816   * @param serverNumber
817   * @return region server
818   */
819  public HRegionServer getRegionServer(int serverNumber) {
820    return hbaseCluster.getRegionServer(serverNumber);
821  }
822
823  public HRegionServer getRegionServer(ServerName serverName) {
824    return hbaseCluster.getRegionServers().stream()
825        .map(t -> t.getRegionServer())
826        .filter(r -> r.getServerName().equals(serverName))
827        .findFirst().orElse(null);
828  }
829
830  public List<HRegion> getRegions(byte[] tableName) {
831    return getRegions(TableName.valueOf(tableName));
832  }
833
834  public List<HRegion> getRegions(TableName tableName) {
835    List<HRegion> ret = new ArrayList<>();
836    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
837      HRegionServer hrs = rst.getRegionServer();
838      for (Region region : hrs.getOnlineRegionsLocalContext()) {
839        if (region.getTableDescriptor().getTableName().equals(tableName)) {
840          ret.add((HRegion)region);
841        }
842      }
843    }
844    return ret;
845  }
846
847  /**
848   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
849   * of HRS carrying regionName. Returns -1 if none found.
850   */
851  public int getServerWithMeta() {
852    return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
853  }
854
855  /**
856   * Get the location of the specified region
857   * @param regionName Name of the region in bytes
858   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
859   * of HRS carrying hbase:meta. Returns -1 if none found.
860   */
861  public int getServerWith(byte[] regionName) {
862    int index = -1;
863    int count = 0;
864    for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
865      HRegionServer hrs = rst.getRegionServer();
866      if (!hrs.isStopped()) {
867        Region region = hrs.getOnlineRegion(regionName);
868        if (region != null) {
869          index = count;
870          break;
871        }
872      }
873      count++;
874    }
875    return index;
876  }
877
878  @Override
879  public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
880  throws IOException {
881    // Assume there is only one master thread which is the active master.
882    // If there are multiple master threads, the backup master threads
883    // should hold some regions. Please refer to #countServedRegions
884    // to see how we find out all regions.
885    HMaster master = getMaster();
886    Region region = master.getOnlineRegion(regionName);
887    if (region != null) {
888      return master.getServerName();
889    }
890    int index = getServerWith(regionName);
891    if (index < 0) {
892      return null;
893    }
894    return getRegionServer(index).getServerName();
895  }
896
897  /**
898   * Counts the total numbers of regions being served by the currently online
899   * region servers by asking each how many regions they have.  Does not look
900   * at hbase:meta at all.  Count includes catalog tables.
901   * @return number of regions being served by all region servers
902   */
903  public long countServedRegions() {
904    long count = 0;
905    for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
906      count += rst.getRegionServer().getNumberOfOnlineRegions();
907    }
908    for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
909      count += mt.getMaster().getNumberOfOnlineRegions();
910    }
911    return count;
912  }
913
914  /**
915   * Do a simulated kill all masters and regionservers. Useful when it is
916   * impossible to bring the mini-cluster back for clean shutdown.
917   */
918  public void killAll() {
919    // Do backups first.
920    MasterThread activeMaster = null;
921    for (MasterThread masterThread : getMasterThreads()) {
922      if (!masterThread.getMaster().isActiveMaster()) {
923        masterThread.getMaster().abort("killAll");
924      } else {
925        activeMaster = masterThread;
926      }
927    }
928    // Do active after.
929    if (activeMaster != null) {
930      activeMaster.getMaster().abort("killAll");
931    }
932    for (RegionServerThread rst : getRegionServerThreads()) {
933      rst.getRegionServer().abort("killAll");
934    }
935  }
936
937  @Override
938  public void waitUntilShutDown() {
939    this.hbaseCluster.join();
940  }
941
942  public List<HRegion> findRegionsForTable(TableName tableName) {
943    ArrayList<HRegion> ret = new ArrayList<>();
944    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
945      HRegionServer hrs = rst.getRegionServer();
946      for (Region region : hrs.getRegions(tableName)) {
947        if (region.getTableDescriptor().getTableName().equals(tableName)) {
948          ret.add((HRegion)region);
949        }
950      }
951    }
952    return ret;
953  }
954
955
956  protected int getRegionServerIndex(ServerName serverName) {
957    //we have a small number of region servers, this should be fine for now.
958    List<RegionServerThread> servers = getRegionServerThreads();
959    for (int i=0; i < servers.size(); i++) {
960      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
961        return i;
962      }
963    }
964    return -1;
965  }
966
967  protected int getMasterIndex(ServerName serverName) {
968    List<MasterThread> masters = getMasterThreads();
969    for (int i = 0; i < masters.size(); i++) {
970      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
971        return i;
972      }
973    }
974    return -1;
975  }
976
977  @Override
978  public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
979    return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
980  }
981
982  @Override
983  public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
984  throws IOException {
985    return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
986  }
987}