001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.concurrent.CopyOnWriteArrayList;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.client.Admin;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.TableDescriptor;
031import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
032import org.apache.hadoop.hbase.master.HMaster;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.security.User;
035import org.apache.hadoop.hbase.util.JVMClusterUtil;
036import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * This class creates a single process HBase cluster. One thread is created for a master and one per
044 * region server. Call {@link #startup()} to start the cluster running and {@link #shutdown()} to
045 * close it all down. {@link #join} the cluster is you want to wait on shutdown completion.
046 * <p>
047 * Runs master on port 16000 by default. Because we can't just kill the process -- not till
048 * HADOOP-1700 gets fixed and even then.... -- we need to be able to find the master with a remote
049 * client to run shutdown. To use a port other than 16000, set the hbase.master to a value of
050 * 'local:PORT': that is 'local', not 'localhost', and the port number the master should use instead
051 * of 16000.
052 */
053@InterfaceAudience.Private
054public class LocalHBaseCluster {
055  private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class);
056  private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>();
057  private final List<JVMClusterUtil.RegionServerThread> regionThreads =
058    new CopyOnWriteArrayList<>();
059  private final static int DEFAULT_NO = 1;
060  /** local mode */
061  public static final String LOCAL = "local";
062  /** 'local:' */
063  public static final String LOCAL_COLON = LOCAL + ":";
064  public static final String ASSIGN_RANDOM_PORTS = "hbase.localcluster.assign.random.ports";
065
066  private final Configuration conf;
067  private final Class<? extends HMaster> masterClass;
068  private final Class<? extends HRegionServer> regionServerClass;
069
070  /**
071   * Constructor.
072   */
073  public LocalHBaseCluster(final Configuration conf) throws IOException {
074    this(conf, DEFAULT_NO);
075  }
076
077  /**
078   * Constructor.
079   * @param conf            Configuration to use. Post construction has the master's address.
080   * @param noRegionServers Count of regionservers to start.
081   */
082  public LocalHBaseCluster(final Configuration conf, final int noRegionServers) throws IOException {
083    this(conf, 1, 0, noRegionServers, getMasterImplementation(conf),
084      getRegionServerImplementation(conf));
085  }
086
087  /**
088   * Constructor.
089   * @param conf            Configuration to use. Post construction has the active master address.
090   * @param noMasters       Count of masters to start.
091   * @param noRegionServers Count of regionservers to start.
092   */
093  public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers)
094    throws IOException {
095    this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf),
096      getRegionServerImplementation(conf));
097  }
098
099  @SuppressWarnings("unchecked")
100  private static Class<? extends HRegionServer>
101    getRegionServerImplementation(final Configuration conf) {
102    return (Class<? extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL,
103      HRegionServer.class);
104  }
105
106  @SuppressWarnings("unchecked")
107  private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
108    return (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, HMaster.class);
109  }
110
111  public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers,
112    final Class<? extends HMaster> masterClass,
113    final Class<? extends HRegionServer> regionServerClass) throws IOException {
114    this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass);
115  }
116
117  /**
118   * Constructor.
119   * @param conf            Configuration to use. Post construction has the master's address.
120   * @param noMasters       Count of masters to start.
121   * @param noRegionServers Count of regionservers to start.
122   */
123  @SuppressWarnings("unchecked")
124  public LocalHBaseCluster(final Configuration conf, final int noMasters,
125    final int noAlwaysStandByMasters, final int noRegionServers,
126    final Class<? extends HMaster> masterClass,
127    final Class<? extends HRegionServer> regionServerClass) throws IOException {
128    this.conf = conf;
129
130    // When active, if a port selection is default then we switch to random
131    if (conf.getBoolean(ASSIGN_RANDOM_PORTS, false)) {
132      if (
133        conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT)
134            == HConstants.DEFAULT_MASTER_PORT
135      ) {
136        LOG.debug("Setting Master Port to random.");
137        conf.set(HConstants.MASTER_PORT, "0");
138      }
139      if (
140        conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT)
141            == HConstants.DEFAULT_REGIONSERVER_PORT
142      ) {
143        LOG.debug("Setting RegionServer Port to random.");
144        conf.set(HConstants.REGIONSERVER_PORT, "0");
145      }
146      // treat info ports special; expressly don't change '-1' (keep off)
147      // in case we make that the default behavior.
148      if (
149        conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 0) != -1
150          && conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
151            HConstants.DEFAULT_REGIONSERVER_INFOPORT) == HConstants.DEFAULT_REGIONSERVER_INFOPORT
152      ) {
153        LOG.debug("Setting RS InfoServer Port to random.");
154        conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
155      }
156      if (
157        conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1
158          && conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT)
159              == HConstants.DEFAULT_MASTER_INFOPORT
160      ) {
161        LOG.debug("Setting Master InfoServer Port to random.");
162        conf.set(HConstants.MASTER_INFO_PORT, "0");
163      }
164    }
165
166    this.masterClass =
167      (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, masterClass);
168    // Start the HMasters.
169    int i;
170    for (i = 0; i < noMasters; i++) {
171      addMaster(new Configuration(conf), i);
172    }
173    for (int j = 0; j < noAlwaysStandByMasters; j++) {
174      Configuration c = new Configuration(conf);
175      c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster");
176      addMaster(c, i + j);
177    }
178    // Start the HRegionServers.
179    this.regionServerClass = (Class<? extends HRegionServer>) conf
180      .getClass(HConstants.REGION_SERVER_IMPL, regionServerClass);
181
182    for (int j = 0; j < noRegionServers; j++) {
183      addRegionServer(new Configuration(conf), j);
184    }
185  }
186
187  public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException {
188    return addRegionServer(new Configuration(conf), this.regionThreads.size());
189  }
190
191  @SuppressWarnings("unchecked")
192  public JVMClusterUtil.RegionServerThread addRegionServer(Configuration config, final int index)
193    throws IOException {
194    // Create each regionserver with its own Configuration instance so each has
195    // its Connection instance rather than share (see HBASE_INSTANCES down in
196    // the guts of ConnectionManager).
197    JVMClusterUtil.RegionServerThread rst =
198      JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf
199        .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
200
201    this.regionThreads.add(rst);
202    return rst;
203  }
204
205  public JVMClusterUtil.RegionServerThread addRegionServer(final Configuration config,
206    final int index, User user) throws IOException, InterruptedException {
207    return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
208      @Override
209      public JVMClusterUtil.RegionServerThread run() throws Exception {
210        return addRegionServer(config, index);
211      }
212    });
213  }
214
215  public JVMClusterUtil.MasterThread addMaster() throws IOException {
216    return addMaster(new Configuration(conf), this.masterThreads.size());
217  }
218
219  public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
220    throws IOException {
221    // Create each master with its own Configuration instance so each has
222    // its Connection instance rather than share (see HBASE_INSTANCES down in
223    // the guts of ConnectionManager.
224    JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
225      (Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
226    this.masterThreads.add(mt);
227    // Refresh the master address config.
228    List<String> masterHostPorts = new ArrayList<>();
229    getMasters().forEach(masterThread -> masterHostPorts
230      .add(masterThread.getMaster().getServerName().getAddress().toString()));
231    conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));
232    return mt;
233  }
234
235  public JVMClusterUtil.MasterThread addMaster(final Configuration c, final int index, User user)
236    throws IOException, InterruptedException {
237    return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
238      @Override
239      public JVMClusterUtil.MasterThread run() throws Exception {
240        return addMaster(c, index);
241      }
242    });
243  }
244
245  /** Returns region server */
246  public HRegionServer getRegionServer(int serverNumber) {
247    return regionThreads.get(serverNumber).getRegionServer();
248  }
249
250  /** Returns Read-only list of region server threads. */
251  public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
252    return Collections.unmodifiableList(this.regionThreads);
253  }
254
255  /**
256   * @return List of running servers (Some servers may have been killed or aborted during lifetime
257   *         of cluster; these servers are not included in this list).
258   */
259  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
260    List<JVMClusterUtil.RegionServerThread> liveServers = new ArrayList<>();
261    List<RegionServerThread> list = getRegionServers();
262    for (JVMClusterUtil.RegionServerThread rst : list) {
263      if (rst.isAlive()) liveServers.add(rst);
264      else LOG.info("Not alive " + rst.getName());
265    }
266    return liveServers;
267  }
268
269  /** Returns the Configuration used by this LocalHBaseCluster */
270  public Configuration getConfiguration() {
271    return this.conf;
272  }
273
274  /**
275   * Wait for the specified region server to stop. Removes this thread from list of running threads.
276   * @return Name of region server that just went down.
277   */
278  public String waitOnRegionServer(int serverNumber) {
279    JVMClusterUtil.RegionServerThread regionServerThread = this.regionThreads.get(serverNumber);
280    return waitOnRegionServer(regionServerThread);
281  }
282
283  /**
284   * Wait for the specified region server to stop. Removes this thread from list of running threads.
285   * @return Name of region server that just went down.
286   */
287  public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
288    boolean interrupted = false;
289    while (rst.isAlive()) {
290      try {
291        LOG.info("Waiting on " + rst.getRegionServer().toString());
292        rst.join();
293      } catch (InterruptedException e) {
294        LOG.error("Interrupted while waiting for {} to finish. Retrying join", rst.getName(), e);
295        interrupted = true;
296      }
297    }
298    regionThreads.remove(rst);
299    if (interrupted) {
300      Thread.currentThread().interrupt();
301    }
302    return rst.getName();
303  }
304
305  /** Returns the HMaster thread */
306  public HMaster getMaster(int serverNumber) {
307    return masterThreads.get(serverNumber).getMaster();
308  }
309
310  /**
311   * Gets the current active master, if available. If no active master, returns null.
312   * @return the HMaster for the active master
313   */
314  public HMaster getActiveMaster() {
315    for (JVMClusterUtil.MasterThread mt : masterThreads) {
316      // Ensure that the current active master is not stopped.
317      // We don't want to return a stopping master as an active master.
318      if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
319        return mt.getMaster();
320      }
321    }
322    return null;
323  }
324
325  /** Returns Read-only list of master threads. */
326  public List<JVMClusterUtil.MasterThread> getMasters() {
327    return Collections.unmodifiableList(this.masterThreads);
328  }
329
330  /**
331   * @return List of running master servers (Some servers may have been killed or aborted during
332   *         lifetime of cluster; these servers are not included in this list).
333   */
334  public List<JVMClusterUtil.MasterThread> getLiveMasters() {
335    List<JVMClusterUtil.MasterThread> liveServers = new ArrayList<>();
336    List<JVMClusterUtil.MasterThread> list = getMasters();
337    for (JVMClusterUtil.MasterThread mt : list) {
338      if (mt.isAlive()) {
339        liveServers.add(mt);
340      }
341    }
342    return liveServers;
343  }
344
345  /**
346   * Wait for the specified master to stop. Removes this thread from list of running threads.
347   * @return Name of master that just went down.
348   */
349  public String waitOnMaster(int serverNumber) {
350    JVMClusterUtil.MasterThread masterThread = this.masterThreads.get(serverNumber);
351    return waitOnMaster(masterThread);
352  }
353
354  /**
355   * Wait for the specified master to stop. Removes this thread from list of running threads.
356   * @return Name of master that just went down.
357   */
358  public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
359    boolean interrupted = false;
360    while (masterThread.isAlive()) {
361      try {
362        LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString());
363        masterThread.join();
364      } catch (InterruptedException e) {
365        LOG.error("Interrupted while waiting for {} to finish. Retrying join",
366          masterThread.getName(), e);
367        interrupted = true;
368      }
369    }
370    masterThreads.remove(masterThread);
371    if (interrupted) {
372      Thread.currentThread().interrupt();
373    }
374    return masterThread.getName();
375  }
376
377  /**
378   * Wait for Mini HBase Cluster to shut down. Presumes you've already called {@link #shutdown()}.
379   */
380  public void join() {
381    if (this.regionThreads != null) {
382      for (Thread t : this.regionThreads) {
383        if (t.isAlive()) {
384          try {
385            Threads.threadDumpingIsAlive(t);
386          } catch (InterruptedException e) {
387            LOG.debug("Interrupted", e);
388          }
389        }
390      }
391    }
392    if (this.masterThreads != null) {
393      for (Thread t : this.masterThreads) {
394        if (t.isAlive()) {
395          try {
396            Threads.threadDumpingIsAlive(t);
397          } catch (InterruptedException e) {
398            LOG.debug("Interrupted", e);
399          }
400        }
401      }
402    }
403  }
404
405  /**
406   * Start the cluster.
407   */
408  public void startup() throws IOException {
409    JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
410  }
411
412  /**
413   * Shut down the mini HBase cluster
414   */
415  public void shutdown() {
416    JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
417  }
418
419  /**
420   * @param c Configuration to check.
421   * @return True if a 'local' address in hbase.master value.
422   */
423  public static boolean isLocal(final Configuration c) {
424    boolean mode =
425      c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
426    return (mode == HConstants.CLUSTER_IS_LOCAL);
427  }
428
429  /**
430   * Test things basically work.
431   */
432  public static void main(String[] args) throws IOException {
433    Configuration conf = HBaseConfiguration.create();
434    LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
435    cluster.startup();
436    try (Connection connection = ConnectionFactory.createConnection(conf);
437      Admin admin = connection.getAdmin()) {
438      TableDescriptor htd =
439        TableDescriptorBuilder.newBuilder(TableName.valueOf(cluster.getClass().getName())).build();
440      admin.createTable(htd);
441    } finally {
442      cluster.shutdown();
443    }
444  }
445}