View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.security.PrivilegedExceptionAction;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.classification.InterfaceAudience;
30  import org.apache.hadoop.classification.InterfaceStability;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.client.HBaseAdmin;
33  import org.apache.hadoop.hbase.regionserver.HRegionServer;
34  import org.apache.hadoop.hbase.security.User;
35  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
36  import org.apache.hadoop.hbase.util.Threads;
37  
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import org.apache.hadoop.hbase.master.HMaster;
40  import org.apache.hadoop.hbase.util.JVMClusterUtil;
41  
42  /**
43   * This class creates a single process HBase cluster. One thread is created for
44   * a master and one per region server.
45   *
46   * Call {@link #startup()} to start the cluster running and {@link #shutdown()}
47   * to close it all down. {@link #join} the cluster is you want to wait on
48   * shutdown completion.
49   *
50   * <p>Runs master on port 16000 by default.  Because we can't just kill the
51   * process -- not till HADOOP-1700 gets fixed and even then.... -- we need to
52   * be able to find the master with a remote client to run shutdown.  To use a
53   * port other than 16000, set the hbase.master to a value of 'local:PORT':
54   * that is 'local', not 'localhost', and the port number the master should use
55   * instead of 16000.
56   *
57   */
58  @InterfaceAudience.Public
59  @InterfaceStability.Evolving
60  public class LocalHBaseCluster {
61    static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
62    private final List<JVMClusterUtil.MasterThread> masterThreads =
63      new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
64    private final List<JVMClusterUtil.RegionServerThread> regionThreads =
65      new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
66    private final static int DEFAULT_NO = 1;
67    /** local mode */
68    public static final String LOCAL = "local";
69    /** 'local:' */
70    public static final String LOCAL_COLON = LOCAL + ":";
71    private final Configuration conf;
72    private final Class<? extends HMaster> masterClass;
73    private final Class<? extends HRegionServer> regionServerClass;
74  
75    /**
76     * Constructor.
77     * @param conf
78     * @throws IOException
79     */
80    public LocalHBaseCluster(final Configuration conf)
81    throws IOException {
82      this(conf, DEFAULT_NO);
83    }
84  
85    /**
86     * Constructor.
87     * @param conf Configuration to use.  Post construction has the master's
88     * address.
89     * @param noRegionServers Count of regionservers to start.
90     * @throws IOException
91     */
92    public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
93    throws IOException {
94      this(conf, 1, noRegionServers, getMasterImplementation(conf),
95          getRegionServerImplementation(conf));
96    }
97  
98    /**
99     * Constructor.
100    * @param conf Configuration to use.  Post construction has the active master
101    * address.
102    * @param noMasters Count of masters to start.
103    * @param noRegionServers Count of regionservers to start.
104    * @throws IOException
105    */
106   public LocalHBaseCluster(final Configuration conf, final int noMasters,
107       final int noRegionServers)
108   throws IOException {
109     this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
110         getRegionServerImplementation(conf));
111   }
112 
113   @SuppressWarnings("unchecked")
114   private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
115     return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
116        HRegionServer.class);
117   }
118 
119   @SuppressWarnings("unchecked")
120   private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
121     return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
122        HMaster.class);
123   }
124 
125   /**
126    * Constructor.
127    * @param conf Configuration to use.  Post construction has the master's
128    * address.
129    * @param noMasters Count of masters to start.
130    * @param noRegionServers Count of regionservers to start.
131    * @param masterClass
132    * @param regionServerClass
133    * @throws IOException
134    */
135   @SuppressWarnings("unchecked")
136   public LocalHBaseCluster(final Configuration conf, final int noMasters,
137     final int noRegionServers, final Class<? extends HMaster> masterClass,
138     final Class<? extends HRegionServer> regionServerClass)
139   throws IOException {
140     this.conf = conf;
141 
142     // Always have masters and regionservers come up on port '0' so we don't
143     // clash over default ports.
144     conf.set(HConstants.MASTER_PORT, "0");
145     conf.set(HConstants.REGIONSERVER_PORT, "0");
146     conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
147 
148     this.masterClass = (Class<? extends HMaster>)
149       conf.getClass(HConstants.MASTER_IMPL, masterClass);
150     // Start the HMasters.
151     for (int i = 0; i < noMasters; i++) {
152       addMaster(new Configuration(conf), i);
153     }
154     // Start the HRegionServers.
155     this.regionServerClass =
156       (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
157        regionServerClass);
158 
159     for (int i = 0; i < noRegionServers; i++) {
160       addRegionServer(new Configuration(conf), i);
161     }
162   }
163 
164   public JVMClusterUtil.RegionServerThread addRegionServer()
165       throws IOException {
166     return addRegionServer(new Configuration(conf), this.regionThreads.size());
167   }
168 
169   public JVMClusterUtil.RegionServerThread addRegionServer(
170       Configuration config, final int index)
171   throws IOException {
172     // Create each regionserver with its own Configuration instance so each has
173     // its HConnection instance rather than share (see HBASE_INSTANCES down in
174     // the guts of HConnectionManager.
175 
176     // Also, create separate CoordinatedStateManager instance per Server.
177     // This is special case when we have to have more than 1 CoordinatedStateManager
178     // within 1 process.
179     CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
180 
181     JVMClusterUtil.RegionServerThread rst =
182       JVMClusterUtil.createRegionServerThread(config, cp,
183           this.regionServerClass, index);
184     this.regionThreads.add(rst);
185     return rst;
186   }
187 
188   public JVMClusterUtil.RegionServerThread addRegionServer(
189       final Configuration config, final int index, User user)
190   throws IOException, InterruptedException {
191     return user.runAs(
192         new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
193           public JVMClusterUtil.RegionServerThread run() throws Exception {
194             return addRegionServer(config, index);
195           }
196         });
197   }
198 
199   public JVMClusterUtil.MasterThread addMaster() throws IOException {
200     return addMaster(new Configuration(conf), this.masterThreads.size());
201   }
202 
203   public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
204   throws IOException {
205     // Create each master with its own Configuration instance so each has
206     // its HConnection instance rather than share (see HBASE_INSTANCES down in
207     // the guts of HConnectionManager.
208 
209     // Also, create separate CoordinatedStateManager instance per Server.
210     // This is special case when we have to have more than 1 CoordinatedStateManager
211     // within 1 process.
212     CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
213 
214     JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
215         (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
216     this.masterThreads.add(mt);
217     return mt;
218   }
219 
220   public JVMClusterUtil.MasterThread addMaster(
221       final Configuration c, final int index, User user)
222   throws IOException, InterruptedException {
223     return user.runAs(
224         new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
225           public JVMClusterUtil.MasterThread run() throws Exception {
226             return addMaster(c, index);
227           }
228         });
229   }
230 
231   /**
232    * @param serverNumber
233    * @return region server
234    */
235   public HRegionServer getRegionServer(int serverNumber) {
236     return regionThreads.get(serverNumber).getRegionServer();
237   }
238 
239   /**
240    * @return Read-only list of region server threads.
241    */
242   public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
243     return Collections.unmodifiableList(this.regionThreads);
244   }
245 
246   /**
247    * @return List of running servers (Some servers may have been killed or
248    * aborted during lifetime of cluster; these servers are not included in this
249    * list).
250    */
251   public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
252     List<JVMClusterUtil.RegionServerThread> liveServers =
253       new ArrayList<JVMClusterUtil.RegionServerThread>();
254     List<RegionServerThread> list = getRegionServers();
255     for (JVMClusterUtil.RegionServerThread rst: list) {
256       if (rst.isAlive()) liveServers.add(rst);
257       else LOG.info("Not alive " + rst.getName());
258     }
259     return liveServers;
260   }
261 
262   /**
263    * Wait for the specified region server to stop
264    * Removes this thread from list of running threads.
265    * @param serverNumber
266    * @return Name of region server that just went down.
267    */
268   public String waitOnRegionServer(int serverNumber) {
269     JVMClusterUtil.RegionServerThread regionServerThread =
270       this.regionThreads.remove(serverNumber);
271     while (regionServerThread.isAlive()) {
272       try {
273         LOG.info("Waiting on " +
274           regionServerThread.getRegionServer().toString());
275         regionServerThread.join();
276       } catch (InterruptedException e) {
277         e.printStackTrace();
278       }
279     }
280     return regionServerThread.getName();
281   }
282 
283   /**
284    * Wait for the specified region server to stop
285    * Removes this thread from list of running threads.
286    * @param rst
287    * @return Name of region server that just went down.
288    */
289   public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
290     while (rst.isAlive()) {
291       try {
292         LOG.info("Waiting on " +
293           rst.getRegionServer().toString());
294         rst.join();
295       } catch (InterruptedException e) {
296         e.printStackTrace();
297       }
298     }
299     for (int i=0;i<regionThreads.size();i++) {
300       if (regionThreads.get(i) == rst) {
301         regionThreads.remove(i);
302         break;
303       }
304     }
305     return rst.getName();
306   }
307 
308   /**
309    * @param serverNumber
310    * @return the HMaster thread
311    */
312   public HMaster getMaster(int serverNumber) {
313     return masterThreads.get(serverNumber).getMaster();
314   }
315 
316   /**
317    * Gets the current active master, if available.  If no active master, returns
318    * null.
319    * @return the HMaster for the active master
320    */
321   public HMaster getActiveMaster() {
322     for (JVMClusterUtil.MasterThread mt : masterThreads) {
323       if (mt.getMaster().isActiveMaster()) {
324         // Ensure that the current active master is not stopped.
325         // We don't want to return a stopping master as an active master.
326         if (mt.getMaster().isActiveMaster()  && !mt.getMaster().isStopped()) {
327           return mt.getMaster();
328         }
329       }
330     }
331     return null;
332   }
333 
334   /**
335    * @return Read-only list of master threads.
336    */
337   public List<JVMClusterUtil.MasterThread> getMasters() {
338     return Collections.unmodifiableList(this.masterThreads);
339   }
340 
341   /**
342    * @return List of running master servers (Some servers may have been killed
343    * or aborted during lifetime of cluster; these servers are not included in
344    * this list).
345    */
346   public List<JVMClusterUtil.MasterThread> getLiveMasters() {
347     List<JVMClusterUtil.MasterThread> liveServers =
348       new ArrayList<JVMClusterUtil.MasterThread>();
349     List<JVMClusterUtil.MasterThread> list = getMasters();
350     for (JVMClusterUtil.MasterThread mt: list) {
351       if (mt.isAlive()) {
352         liveServers.add(mt);
353       }
354     }
355     return liveServers;
356   }
357 
358   /**
359    * Wait for the specified master to stop
360    * Removes this thread from list of running threads.
361    * @param serverNumber
362    * @return Name of master that just went down.
363    */
364   public String waitOnMaster(int serverNumber) {
365     JVMClusterUtil.MasterThread masterThread =
366       this.masterThreads.remove(serverNumber);
367     while (masterThread.isAlive()) {
368       try {
369         LOG.info("Waiting on " +
370           masterThread.getMaster().getServerName().toString());
371         masterThread.join();
372       } catch (InterruptedException e) {
373         e.printStackTrace();
374       }
375     }
376     return masterThread.getName();
377   }
378 
379   /**
380    * Wait for the specified master to stop
381    * Removes this thread from list of running threads.
382    * @param masterThread
383    * @return Name of master that just went down.
384    */
385   public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
386     while (masterThread.isAlive()) {
387       try {
388         LOG.info("Waiting on " +
389           masterThread.getMaster().getServerName().toString());
390         masterThread.join();
391       } catch (InterruptedException e) {
392         e.printStackTrace();
393       }
394     }
395     for (int i=0;i<masterThreads.size();i++) {
396       if (masterThreads.get(i) == masterThread) {
397         masterThreads.remove(i);
398         break;
399       }
400     }
401     return masterThread.getName();
402   }
403 
404   /**
405    * Wait for Mini HBase Cluster to shut down.
406    * Presumes you've already called {@link #shutdown()}.
407    */
408   public void join() {
409     if (this.regionThreads != null) {
410       for(Thread t: this.regionThreads) {
411         if (t.isAlive()) {
412           try {
413             Threads.threadDumpingIsAlive(t);
414           } catch (InterruptedException e) {
415             LOG.debug("Interrupted", e);
416           }
417         }
418       }
419     }
420     if (this.masterThreads != null) {
421       for (Thread t : this.masterThreads) {
422         if (t.isAlive()) {
423           try {
424             Threads.threadDumpingIsAlive(t);
425           } catch (InterruptedException e) {
426             LOG.debug("Interrupted", e);
427           }
428         }
429       }
430     }
431   }
432 
433   /**
434    * Start the cluster.
435    */
436   public void startup() throws IOException {
437     JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
438   }
439 
440   /**
441    * Shut down the mini HBase cluster
442    */
443   public void shutdown() {
444     JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
445   }
446 
447   /**
448    * @param c Configuration to check.
449    * @return True if a 'local' address in hbase.master value.
450    */
451   public static boolean isLocal(final Configuration c) {
452     boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
453     return(mode == HConstants.CLUSTER_IS_LOCAL);
454   }
455 
456   /**
457    * Test things basically work.
458    * @param args
459    * @throws IOException
460    */
461   public static void main(String[] args) throws IOException {
462     Configuration conf = HBaseConfiguration.create();
463     LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
464     cluster.startup();
465     HBaseAdmin admin = new HBaseAdmin(conf);
466     HTableDescriptor htd =
467       new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
468     admin.createTable(htd);
469     cluster.shutdown();
470   }
471 }