001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.concurrent.CopyOnWriteArrayList;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.client.Admin;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.master.HMaster;
031import org.apache.hadoop.hbase.regionserver.HRegionServer;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hadoop.hbase.util.JVMClusterUtil;
034import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * This class creates a single process HBase cluster. One thread is created for a master and one per
042 * region server. Call {@link #startup()} to start the cluster running and {@link #shutdown()} to
043 * close it all down. {@link #join} the cluster is you want to wait on shutdown completion.
044 * <p>
045 * Runs master on port 16000 by default. Because we can't just kill the process -- not till
046 * HADOOP-1700 gets fixed and even then.... -- we need to be able to find the master with a remote
047 * client to run shutdown. To use a port other than 16000, set the hbase.master to a value of
048 * 'local:PORT': that is 'local', not 'localhost', and the port number the master should use instead
049 * of 16000.
050 */
051@InterfaceAudience.Public
052public class LocalHBaseCluster {
053  private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class);
054  private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>();
055  private final List<JVMClusterUtil.RegionServerThread> regionThreads =
056    new CopyOnWriteArrayList<>();
057  private final static int DEFAULT_NO = 1;
058  /** local mode */
059  public static final String LOCAL = "local";
060  /** 'local:' */
061  public static final String LOCAL_COLON = LOCAL + ":";
062  public static final String ASSIGN_RANDOM_PORTS = "hbase.localcluster.assign.random.ports";
063
064  private final Configuration conf;
065  private final Class<? extends HMaster> masterClass;
066  private final Class<? extends HRegionServer> regionServerClass;
067
068  /**
069   * Constructor. nn
070   */
071  public LocalHBaseCluster(final Configuration conf) throws IOException {
072    this(conf, DEFAULT_NO);
073  }
074
075  /**
076   * Constructor.
077   * @param conf            Configuration to use. Post construction has the master's address.
078   * @param noRegionServers Count of regionservers to start. n
079   */
080  public LocalHBaseCluster(final Configuration conf, final int noRegionServers) throws IOException {
081    this(conf, 1, 0, noRegionServers, getMasterImplementation(conf),
082      getRegionServerImplementation(conf));
083  }
084
085  /**
086   * Constructor.
087   * @param conf            Configuration to use. Post construction has the active master address.
088   * @param noMasters       Count of masters to start.
089   * @param noRegionServers Count of regionservers to start. n
090   */
091  public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers)
092    throws IOException {
093    this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf),
094      getRegionServerImplementation(conf));
095  }
096
097  @SuppressWarnings("unchecked")
098  private static Class<? extends HRegionServer>
099    getRegionServerImplementation(final Configuration conf) {
100    return (Class<? extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL,
101      HRegionServer.class);
102  }
103
104  @SuppressWarnings("unchecked")
105  private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
106    return (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, HMaster.class);
107  }
108
109  public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers,
110    final Class<? extends HMaster> masterClass,
111    final Class<? extends HRegionServer> regionServerClass) throws IOException {
112    this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass);
113  }
114
115  /**
116   * Constructor.
117   * @param conf            Configuration to use. Post construction has the master's address.
118   * @param noMasters       Count of masters to start.
119   * @param noRegionServers Count of regionservers to start. nnn
120   */
121  @SuppressWarnings("unchecked")
122  public LocalHBaseCluster(final Configuration conf, final int noMasters,
123    final int noAlwaysStandByMasters, final int noRegionServers,
124    final Class<? extends HMaster> masterClass,
125    final Class<? extends HRegionServer> regionServerClass) throws IOException {
126    this.conf = conf;
127
128    // When active, if a port selection is default then we switch to random
129    if (conf.getBoolean(ASSIGN_RANDOM_PORTS, false)) {
130      if (
131        conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT)
132            == HConstants.DEFAULT_MASTER_PORT
133      ) {
134        LOG.debug("Setting Master Port to random.");
135        conf.set(HConstants.MASTER_PORT, "0");
136      }
137      if (
138        conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT)
139            == HConstants.DEFAULT_REGIONSERVER_PORT
140      ) {
141        LOG.debug("Setting RegionServer Port to random.");
142        conf.set(HConstants.REGIONSERVER_PORT, "0");
143      }
144      // treat info ports special; expressly don't change '-1' (keep off)
145      // in case we make that the default behavior.
146      if (
147        conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 0) != -1
148          && conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
149            HConstants.DEFAULT_REGIONSERVER_INFOPORT) == HConstants.DEFAULT_REGIONSERVER_INFOPORT
150      ) {
151        LOG.debug("Setting RS InfoServer Port to random.");
152        conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
153      }
154      if (
155        conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1
156          && conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT)
157              == HConstants.DEFAULT_MASTER_INFOPORT
158      ) {
159        LOG.debug("Setting Master InfoServer Port to random.");
160        conf.set(HConstants.MASTER_INFO_PORT, "0");
161      }
162    }
163
164    this.masterClass =
165      (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, masterClass);
166    // Start the HMasters.
167    int i;
168    for (i = 0; i < noMasters; i++) {
169      addMaster(new Configuration(conf), i);
170    }
171    for (int j = 0; j < noAlwaysStandByMasters; j++) {
172      Configuration c = new Configuration(conf);
173      c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster");
174      addMaster(c, i + j);
175    }
176    // Start the HRegionServers.
177    this.regionServerClass = (Class<? extends HRegionServer>) conf
178      .getClass(HConstants.REGION_SERVER_IMPL, regionServerClass);
179
180    for (int j = 0; j < noRegionServers; j++) {
181      addRegionServer(new Configuration(conf), j);
182    }
183  }
184
185  public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException {
186    return addRegionServer(new Configuration(conf), this.regionThreads.size());
187  }
188
189  @SuppressWarnings("unchecked")
190  public JVMClusterUtil.RegionServerThread addRegionServer(Configuration config, final int index)
191    throws IOException {
192    // Create each regionserver with its own Configuration instance so each has
193    // its Connection instance rather than share (see HBASE_INSTANCES down in
194    // the guts of ConnectionManager).
195    JVMClusterUtil.RegionServerThread rst =
196      JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf
197        .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
198
199    this.regionThreads.add(rst);
200    return rst;
201  }
202
203  public JVMClusterUtil.RegionServerThread addRegionServer(final Configuration config,
204    final int index, User user) throws IOException, InterruptedException {
205    return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
206      @Override
207      public JVMClusterUtil.RegionServerThread run() throws Exception {
208        return addRegionServer(config, index);
209      }
210    });
211  }
212
213  public JVMClusterUtil.MasterThread addMaster() throws IOException {
214    return addMaster(new Configuration(conf), this.masterThreads.size());
215  }
216
217  public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
218    throws IOException {
219    // Create each master with its own Configuration instance so each has
220    // its Connection instance rather than share (see HBASE_INSTANCES down in
221    // the guts of ConnectionManager.
222    JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
223      (Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
224    this.masterThreads.add(mt);
225    // Refresh the master address config.
226    List<String> masterHostPorts = new ArrayList<>();
227    getMasters().forEach(masterThread -> masterHostPorts
228      .add(masterThread.getMaster().getServerName().getAddress().toString()));
229    conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));
230    return mt;
231  }
232
233  public JVMClusterUtil.MasterThread addMaster(final Configuration c, final int index, User user)
234    throws IOException, InterruptedException {
235    return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
236      @Override
237      public JVMClusterUtil.MasterThread run() throws Exception {
238        return addMaster(c, index);
239      }
240    });
241  }
242
243  /**
244   * n * @return region server
245   */
246  public HRegionServer getRegionServer(int serverNumber) {
247    return regionThreads.get(serverNumber).getRegionServer();
248  }
249
250  /** Returns Read-only list of region server threads. */
251  public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
252    return Collections.unmodifiableList(this.regionThreads);
253  }
254
255  /**
256   * @return List of running servers (Some servers may have been killed or aborted during lifetime
257   *         of cluster; these servers are not included in this list).
258   */
259  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
260    List<JVMClusterUtil.RegionServerThread> liveServers = new ArrayList<>();
261    List<RegionServerThread> list = getRegionServers();
262    for (JVMClusterUtil.RegionServerThread rst : list) {
263      if (rst.isAlive()) liveServers.add(rst);
264      else LOG.info("Not alive " + rst.getName());
265    }
266    return liveServers;
267  }
268
269  /** Returns the Configuration used by this LocalHBaseCluster */
270  public Configuration getConfiguration() {
271    return this.conf;
272  }
273
274  /**
275   * Wait for the specified region server to stop. Removes this thread from list of running threads.
276   * @return Name of region server that just went down.
277   */
278  public String waitOnRegionServer(int serverNumber) {
279    JVMClusterUtil.RegionServerThread regionServerThread = this.regionThreads.get(serverNumber);
280    return waitOnRegionServer(regionServerThread);
281  }
282
283  /**
284   * Wait for the specified region server to stop. Removes this thread from list of running threads.
285   * @return Name of region server that just went down.
286   */
287  public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
288    boolean interrupted = false;
289    while (rst.isAlive()) {
290      try {
291        LOG.info("Waiting on " + rst.getRegionServer().toString());
292        rst.join();
293      } catch (InterruptedException e) {
294        LOG.error("Interrupted while waiting for {} to finish. Retrying join", rst.getName(), e);
295        interrupted = true;
296      }
297    }
298    regionThreads.remove(rst);
299    if (interrupted) {
300      Thread.currentThread().interrupt();
301    }
302    return rst.getName();
303  }
304
305  /** Returns the HMaster thread */
306  public HMaster getMaster(int serverNumber) {
307    return masterThreads.get(serverNumber).getMaster();
308  }
309
310  /**
311   * Gets the current active master, if available. If no active master, returns null.
312   * @return the HMaster for the active master
313   */
314  public HMaster getActiveMaster() {
315    for (JVMClusterUtil.MasterThread mt : masterThreads) {
316      // Ensure that the current active master is not stopped.
317      // We don't want to return a stopping master as an active master.
318      if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
319        return mt.getMaster();
320      }
321    }
322    return null;
323  }
324
325  /** Returns Read-only list of master threads. */
326  public List<JVMClusterUtil.MasterThread> getMasters() {
327    return Collections.unmodifiableList(this.masterThreads);
328  }
329
330  /**
331   * @return List of running master servers (Some servers may have been killed or aborted during
332   *         lifetime of cluster; these servers are not included in this list).
333   */
334  public List<JVMClusterUtil.MasterThread> getLiveMasters() {
335    List<JVMClusterUtil.MasterThread> liveServers = new ArrayList<>();
336    List<JVMClusterUtil.MasterThread> list = getMasters();
337    for (JVMClusterUtil.MasterThread mt : list) {
338      if (mt.isAlive()) {
339        liveServers.add(mt);
340      }
341    }
342    return liveServers;
343  }
344
345  /**
346   * Wait for the specified master to stop. Removes this thread from list of running threads.
347   * @return Name of master that just went down.
348   */
349  public String waitOnMaster(int serverNumber) {
350    JVMClusterUtil.MasterThread masterThread = this.masterThreads.get(serverNumber);
351    return waitOnMaster(masterThread);
352  }
353
354  /**
355   * Wait for the specified master to stop. Removes this thread from list of running threads.
356   * @return Name of master that just went down.
357   */
358  public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
359    boolean interrupted = false;
360    while (masterThread.isAlive()) {
361      try {
362        LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString());
363        masterThread.join();
364      } catch (InterruptedException e) {
365        LOG.error("Interrupted while waiting for {} to finish. Retrying join",
366          masterThread.getName(), e);
367        interrupted = true;
368      }
369    }
370    masterThreads.remove(masterThread);
371    if (interrupted) {
372      Thread.currentThread().interrupt();
373    }
374    return masterThread.getName();
375  }
376
377  /**
378   * Wait for Mini HBase Cluster to shut down. Presumes you've already called {@link #shutdown()}.
379   */
380  public void join() {
381    if (this.regionThreads != null) {
382      for (Thread t : this.regionThreads) {
383        if (t.isAlive()) {
384          try {
385            Threads.threadDumpingIsAlive(t);
386          } catch (InterruptedException e) {
387            LOG.debug("Interrupted", e);
388          }
389        }
390      }
391    }
392    if (this.masterThreads != null) {
393      for (Thread t : this.masterThreads) {
394        if (t.isAlive()) {
395          try {
396            Threads.threadDumpingIsAlive(t);
397          } catch (InterruptedException e) {
398            LOG.debug("Interrupted", e);
399          }
400        }
401      }
402    }
403  }
404
405  /**
406   * Start the cluster.
407   */
408  public void startup() throws IOException {
409    JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
410  }
411
412  /**
413   * Shut down the mini HBase cluster
414   */
415  public void shutdown() {
416    JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
417  }
418
419  /**
420   * @param c Configuration to check.
421   * @return True if a 'local' address in hbase.master value.
422   */
423  public static boolean isLocal(final Configuration c) {
424    boolean mode =
425      c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
426    return (mode == HConstants.CLUSTER_IS_LOCAL);
427  }
428
429  /**
430   * Test things basically work. nn
431   */
432  public static void main(String[] args) throws IOException {
433    Configuration conf = HBaseConfiguration.create();
434    LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
435    cluster.startup();
436    Connection connection = ConnectionFactory.createConnection(conf);
437    Admin admin = connection.getAdmin();
438    try {
439      HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
440      admin.createTable(htd);
441    } finally {
442      admin.close();
443    }
444    connection.close();
445    cluster.shutdown();
446  }
447}