View Javadoc

1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import java.io.IOException;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.client.HBaseAdmin;
32  import org.apache.hadoop.hbase.regionserver.HRegionServer;
33  import org.apache.hadoop.hbase.security.User;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
36  import org.apache.hadoop.hbase.util.Threads;
37  
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import org.apache.hadoop.hbase.master.HMaster;
40  import org.apache.hadoop.hbase.util.JVMClusterUtil;
41  
42  /**
43   * This class creates a single process HBase cluster. One thread is created for
44   * a master and one per region server.
45   *
46   * Call {@link #startup()} to start the cluster running and {@link #shutdown()}
47   * to close it all down. {@link #join} the cluster is you want to wait on
48   * shutdown completion.
49   *
50   * <p>Runs master on port 60000 by default.  Because we can't just kill the
51   * process -- not till HADOOP-1700 gets fixed and even then.... -- we need to
52   * be able to find the master with a remote client to run shutdown.  To use a
53   * port other than 60000, set the hbase.master to a value of 'local:PORT':
54   * that is 'local', not 'localhost', and the port number the master should use
55   * instead of 60000.
56   *
57   */
58  public class LocalHBaseCluster {
59    static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
60    private final List<JVMClusterUtil.MasterThread> masterThreads =
61      new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
62    private final List<JVMClusterUtil.RegionServerThread> regionThreads =
63      new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
64    private final static int DEFAULT_NO = 1;
65    /** local mode */
66    public static final String LOCAL = "local";
67    /** 'local:' */
68    public static final String LOCAL_COLON = LOCAL + ":";
69    private final Configuration conf;
70    private final Class<? extends HMaster> masterClass;
71    private final Class<? extends HRegionServer> regionServerClass;
72  
73    /**
74     * Constructor.
75     * @param conf
76     * @throws IOException
77     */
78    public LocalHBaseCluster(final Configuration conf)
79    throws IOException {
80      this(conf, DEFAULT_NO);
81    }
82  
83    /**
84     * Constructor.
85     * @param conf Configuration to use.  Post construction has the master's
86     * address.
87     * @param noRegionServers Count of regionservers to start.
88     * @throws IOException
89     */
90    public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
91    throws IOException {
92      this(conf, 1, noRegionServers, getMasterImplementation(conf),
93          getRegionServerImplementation(conf));
94    }
95  
96    /**
97     * Constructor.
98     * @param conf Configuration to use.  Post construction has the active master
99     * address.
100    * @param noMasters Count of masters to start.
101    * @param noRegionServers Count of regionservers to start.
102    * @throws IOException
103    */
104   public LocalHBaseCluster(final Configuration conf, final int noMasters,
105       final int noRegionServers)
106   throws IOException {
107     this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
108         getRegionServerImplementation(conf));
109   }
110 
111   @SuppressWarnings("unchecked")
112   private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
113     return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
114        HRegionServer.class);
115   }
116 
117   @SuppressWarnings("unchecked")
118   private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
119     return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
120        HMaster.class);
121   }
122 
123   /**
124    * Constructor.
125    * @param conf Configuration to use.  Post construction has the master's
126    * address.
127    * @param noMasters Count of masters to start.
128    * @param noRegionServers Count of regionservers to start.
129    * @param masterClass
130    * @param regionServerClass
131    * @throws IOException
132    */
133   @SuppressWarnings("unchecked")
134   public LocalHBaseCluster(final Configuration conf, final int noMasters,
135     final int noRegionServers, final Class<? extends HMaster> masterClass,
136     final Class<? extends HRegionServer> regionServerClass)
137   throws IOException {
138     this.conf = conf;
139     // Always have masters and regionservers come up on port '0' so we don't
140     // clash over default ports.
141     conf.set(HConstants.MASTER_PORT, "0");
142     conf.set(HConstants.REGIONSERVER_PORT, "0");
143     conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
144 
145     this.masterClass = (Class<? extends HMaster>)
146       conf.getClass(HConstants.MASTER_IMPL, masterClass);
147     // Start the HMasters.
148     for (int i = 0; i < noMasters; i++) {
149       addMaster(new Configuration(conf), i);
150     }
151     // Start the HRegionServers.
152     this.regionServerClass =
153       (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
154        regionServerClass);
155 
156     for (int i = 0; i < noRegionServers; i++) {
157       addRegionServer(new Configuration(conf), i);
158     }
159   }
160 
161   public JVMClusterUtil.RegionServerThread addRegionServer()
162       throws IOException {
163     return addRegionServer(new Configuration(conf), this.regionThreads.size());
164   }
165 
166   public JVMClusterUtil.RegionServerThread addRegionServer(
167       Configuration config, final int index)
168   throws IOException {
169     // Create each regionserver with its own Configuration instance so each has
170     // its HConnection instance rather than share (see HBASE_INSTANCES down in
171     // the guts of HConnectionManager.
172     JVMClusterUtil.RegionServerThread rst =
173       JVMClusterUtil.createRegionServerThread(config,
174           this.regionServerClass, index);
175     this.regionThreads.add(rst);
176     return rst;
177   }
178 
179   public JVMClusterUtil.RegionServerThread addRegionServer(
180       final Configuration config, final int index, User user)
181   throws IOException, InterruptedException {
182     return user.runAs(
183         new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
184           public JVMClusterUtil.RegionServerThread run() throws Exception {
185             return addRegionServer(config, index);
186           }
187         });
188   }
189 
190   public JVMClusterUtil.MasterThread addMaster() throws IOException {
191     return addMaster(new Configuration(conf), this.masterThreads.size());
192   }
193 
194   public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
195   throws IOException {
196     // Create each master with its own Configuration instance so each has
197     // its HConnection instance rather than share (see HBASE_INSTANCES down in
198     // the guts of HConnectionManager.
199     JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
200         (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
201     this.masterThreads.add(mt);
202     return mt;
203   }
204 
205   public JVMClusterUtil.MasterThread addMaster(
206       final Configuration c, final int index, User user)
207   throws IOException, InterruptedException {
208     return user.runAs(
209         new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
210           public JVMClusterUtil.MasterThread run() throws Exception {
211             return addMaster(c, index);
212           }
213         });
214   }
215 
216   /**
217    * @param serverNumber
218    * @return region server
219    */
220   public HRegionServer getRegionServer(int serverNumber) {
221     return regionThreads.get(serverNumber).getRegionServer();
222   }
223 
224   /**
225    * @return Read-only list of region server threads.
226    */
227   public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
228     return Collections.unmodifiableList(this.regionThreads);
229   }
230 
231   /**
232    * @return List of running servers (Some servers may have been killed or
233    * aborted during lifetime of cluster; these servers are not included in this
234    * list).
235    */
236   public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
237     List<JVMClusterUtil.RegionServerThread> liveServers =
238       new ArrayList<JVMClusterUtil.RegionServerThread>();
239     List<RegionServerThread> list = getRegionServers();
240     for (JVMClusterUtil.RegionServerThread rst: list) {
241       if (rst.isAlive()) liveServers.add(rst);
242       else LOG.info("Not alive " + rst.getName());
243     }
244     return liveServers;
245   }
246 
247   /**
248    * Wait for the specified region server to stop
249    * Removes this thread from list of running threads.
250    * @param serverNumber
251    * @return Name of region server that just went down.
252    */
253   public String waitOnRegionServer(int serverNumber) {
254     JVMClusterUtil.RegionServerThread regionServerThread =
255       this.regionThreads.remove(serverNumber);
256     while (regionServerThread.isAlive()) {
257       try {
258         LOG.info("Waiting on " +
259           regionServerThread.getRegionServer().toString());
260         regionServerThread.join();
261       } catch (InterruptedException e) {
262         e.printStackTrace();
263       }
264     }
265     return regionServerThread.getName();
266   }
267 
268   /**
269    * Wait for the specified region server to stop
270    * Removes this thread from list of running threads.
271    * @param rst
272    * @return Name of region server that just went down.
273    */
274   public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
275     while (rst.isAlive()) {
276       try {
277         LOG.info("Waiting on " +
278           rst.getRegionServer().toString());
279         rst.join();
280       } catch (InterruptedException e) {
281         e.printStackTrace();
282       }
283     }
284     for (int i=0;i<regionThreads.size();i++) {
285       if (regionThreads.get(i) == rst) {
286         regionThreads.remove(i);
287         break;
288       }
289     }
290     return rst.getName();
291   }
292 
293   /**
294    * @param serverNumber
295    * @return the HMaster thread
296    */
297   public HMaster getMaster(int serverNumber) {
298     return masterThreads.get(serverNumber).getMaster();
299   }
300 
301   /**
302    * Gets the current active master, if available.  If no active master, returns
303    * null.
304    * @return the HMaster for the active master
305    */
306   public HMaster getActiveMaster() {
307     for (JVMClusterUtil.MasterThread mt : masterThreads) {
308       if (mt.getMaster().isActiveMaster()) {
309         // Ensure that the current active master is not stopped.
310         // We don't want to return a stopping master as an active master.
311         if (mt.getMaster().isActiveMaster()  && !mt.getMaster().isStopped()) {
312           return mt.getMaster();
313         }
314       }
315     }
316     return null;
317   }
318 
319   /**
320    * @return Read-only list of master threads.
321    */
322   public List<JVMClusterUtil.MasterThread> getMasters() {
323     return Collections.unmodifiableList(this.masterThreads);
324   }
325 
326   /**
327    * @return List of running master servers (Some servers may have been killed
328    * or aborted during lifetime of cluster; these servers are not included in
329    * this list).
330    */
331   public List<JVMClusterUtil.MasterThread> getLiveMasters() {
332     List<JVMClusterUtil.MasterThread> liveServers =
333       new ArrayList<JVMClusterUtil.MasterThread>();
334     List<JVMClusterUtil.MasterThread> list = getMasters();
335     for (JVMClusterUtil.MasterThread mt: list) {
336       if (mt.isAlive()) {
337         liveServers.add(mt);
338       }
339     }
340     return liveServers;
341   }
342 
343   /**
344    * Wait for the specified master to stop
345    * Removes this thread from list of running threads.
346    * @param serverNumber
347    * @return Name of master that just went down.
348    */
349   public String waitOnMaster(int serverNumber) {
350     JVMClusterUtil.MasterThread masterThread =
351       this.masterThreads.remove(serverNumber);
352     while (masterThread.isAlive()) {
353       try {
354         LOG.info("Waiting on " +
355           masterThread.getMaster().getServerName().toString());
356         masterThread.join();
357       } catch (InterruptedException e) {
358         e.printStackTrace();
359       }
360     }
361     return masterThread.getName();
362   }
363 
364   /**
365    * Wait for the specified master to stop
366    * Removes this thread from list of running threads.
367    * @param masterThread
368    * @return Name of master that just went down.
369    */
370   public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
371     while (masterThread.isAlive()) {
372       try {
373         LOG.info("Waiting on " +
374           masterThread.getMaster().getServerName().toString());
375         masterThread.join();
376       } catch (InterruptedException e) {
377         e.printStackTrace();
378       }
379     }
380     for (int i=0;i<masterThreads.size();i++) {
381       if (masterThreads.get(i) == masterThread) {
382         masterThreads.remove(i);
383         break;
384       }
385     }
386     return masterThread.getName();
387   }
388 
389   /**
390    * Wait for Mini HBase Cluster to shut down.
391    * Presumes you've already called {@link #shutdown()}.
392    */
393   public void join() {
394     if (this.regionThreads != null) {
395       for(Thread t: this.regionThreads) {
396         if (t.isAlive()) {
397           try {
398             Threads.threadDumpingIsAlive(t);
399           } catch (InterruptedException e) {
400             LOG.debug("Interrupted", e);
401           }
402         }
403       }
404     }
405     if (this.masterThreads != null) {
406       for (Thread t : this.masterThreads) {
407         if (t.isAlive()) {
408           try {
409             Threads.threadDumpingIsAlive(t);
410           } catch (InterruptedException e) {
411             LOG.debug("Interrupted", e);
412           }
413         }
414       }
415     }
416   }
417 
418   /**
419    * Start the cluster.
420    */
421   public void startup() throws IOException {
422     JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
423   }
424 
425   /**
426    * Shut down the mini HBase cluster
427    */
428   public void shutdown() {
429     JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
430   }
431 
432   /**
433    * @param c Configuration to check.
434    * @return True if a 'local' address in hbase.master value.
435    */
436   public static boolean isLocal(final Configuration c) {
437     boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
438     return(mode == HConstants.CLUSTER_IS_LOCAL);
439   }
440 
441   /**
442    * Test things basically work.
443    * @param args
444    * @throws IOException
445    */
446   public static void main(String[] args) throws IOException {
447     Configuration conf = HBaseConfiguration.create();
448     LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
449     cluster.startup();
450     HBaseAdmin admin = new HBaseAdmin(conf);
451     HTableDescriptor htd =
452       new HTableDescriptor(Bytes.toBytes(cluster.getClass().getName()));
453     admin.createTable(htd);
454     cluster.shutdown();
455   }
456 }