View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.security.PrivilegedExceptionAction;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.client.Admin;
33  import org.apache.hadoop.hbase.client.HBaseAdmin;
34  import org.apache.hadoop.hbase.regionserver.HRegionServer;
35  import org.apache.hadoop.hbase.security.User;
36  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
37  import org.apache.hadoop.hbase.util.Threads;
38  
39  import java.util.concurrent.CopyOnWriteArrayList;
40  import org.apache.hadoop.hbase.master.HMaster;
41  import org.apache.hadoop.hbase.util.JVMClusterUtil;
42  
43  /**
44   * This class creates a single process HBase cluster. One thread is created for
45   * a master and one per region server.
46   *
47   * Call {@link #startup()} to start the cluster running and {@link #shutdown()}
48   * to close it all down. {@link #join} the cluster is you want to wait on
49   * shutdown completion.
50   *
51   * <p>Runs master on port 16000 by default.  Because we can't just kill the
52   * process -- not till HADOOP-1700 gets fixed and even then.... -- we need to
53   * be able to find the master with a remote client to run shutdown.  To use a
54   * port other than 16000, set the hbase.master to a value of 'local:PORT':
55   * that is 'local', not 'localhost', and the port number the master should use
56   * instead of 16000.
57   *
58   */
59  @InterfaceAudience.Public
60  @InterfaceStability.Evolving
61  public class LocalHBaseCluster {
62    private static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
63    private final List<JVMClusterUtil.MasterThread> masterThreads =
64      new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
65    private final List<JVMClusterUtil.RegionServerThread> regionThreads =
66      new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
67    private final static int DEFAULT_NO = 1;
68    /** local mode */
69    public static final String LOCAL = "local";
70    /** 'local:' */
71    public static final String LOCAL_COLON = LOCAL + ":";
72    private final Configuration conf;
73    private final Class<? extends HMaster> masterClass;
74    private final Class<? extends HRegionServer> regionServerClass;
75  
76    /**
77     * Constructor.
78     * @param conf
79     * @throws IOException
80     */
81    public LocalHBaseCluster(final Configuration conf)
82    throws IOException {
83      this(conf, DEFAULT_NO);
84    }
85  
86    /**
87     * Constructor.
88     * @param conf Configuration to use.  Post construction has the master's
89     * address.
90     * @param noRegionServers Count of regionservers to start.
91     * @throws IOException
92     */
93    public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
94    throws IOException {
95      this(conf, 1, noRegionServers, getMasterImplementation(conf),
96          getRegionServerImplementation(conf));
97    }
98  
99    /**
100    * Constructor.
101    * @param conf Configuration to use.  Post construction has the active master
102    * address.
103    * @param noMasters Count of masters to start.
104    * @param noRegionServers Count of regionservers to start.
105    * @throws IOException
106    */
107   public LocalHBaseCluster(final Configuration conf, final int noMasters,
108       final int noRegionServers)
109   throws IOException {
110     this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
111         getRegionServerImplementation(conf));
112   }
113 
114   @SuppressWarnings("unchecked")
115   private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
116     return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
117        HRegionServer.class);
118   }
119 
120   @SuppressWarnings("unchecked")
121   private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
122     return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
123        HMaster.class);
124   }
125 
126   /**
127    * Constructor.
128    * @param conf Configuration to use.  Post construction has the master's
129    * address.
130    * @param noMasters Count of masters to start.
131    * @param noRegionServers Count of regionservers to start.
132    * @param masterClass
133    * @param regionServerClass
134    * @throws IOException
135    */
136   @SuppressWarnings("unchecked")
137   public LocalHBaseCluster(final Configuration conf, final int noMasters,
138     final int noRegionServers, final Class<? extends HMaster> masterClass,
139     final Class<? extends HRegionServer> regionServerClass)
140   throws IOException {
141     this.conf = conf;
142 
143     // Always have masters and regionservers come up on port '0' so we don't
144     // clash over default ports.
145     conf.set(HConstants.MASTER_PORT, "0");
146     conf.set(HConstants.REGIONSERVER_PORT, "0");
147     if (conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 0) != -1) {
148       conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
149     }
150 
151     this.masterClass = (Class<? extends HMaster>)
152       conf.getClass(HConstants.MASTER_IMPL, masterClass);
153     // Start the HMasters.
154     for (int i = 0; i < noMasters; i++) {
155       addMaster(new Configuration(conf), i);
156     }
157     // Start the HRegionServers.
158     this.regionServerClass =
159       (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
160        regionServerClass);
161 
162     for (int i = 0; i < noRegionServers; i++) {
163       addRegionServer(new Configuration(conf), i);
164     }
165   }
166 
167   public JVMClusterUtil.RegionServerThread addRegionServer()
168       throws IOException {
169     return addRegionServer(new Configuration(conf), this.regionThreads.size());
170   }
171 
172   @SuppressWarnings("unchecked")
173   public JVMClusterUtil.RegionServerThread addRegionServer(
174       Configuration config, final int index)
175   throws IOException {
176     // Create each regionserver with its own Configuration instance so each has
177     // its HConnection instance rather than share (see HBASE_INSTANCES down in
178     // the guts of HConnectionManager.
179 
180     // Also, create separate CoordinatedStateManager instance per Server.
181     // This is special case when we have to have more than 1 CoordinatedStateManager
182     // within 1 process.
183     CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
184 
185     JVMClusterUtil.RegionServerThread rst =
186         JVMClusterUtil.createRegionServerThread(config, cp, (Class<? extends HRegionServer>) conf
187             .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
188 
189     this.regionThreads.add(rst);
190     return rst;
191   }
192 
193   public JVMClusterUtil.RegionServerThread addRegionServer(
194       final Configuration config, final int index, User user)
195   throws IOException, InterruptedException {
196     return user.runAs(
197         new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
198           @Override
199           public JVMClusterUtil.RegionServerThread run() throws Exception {
200             return addRegionServer(config, index);
201           }
202         });
203   }
204 
205   public JVMClusterUtil.MasterThread addMaster() throws IOException {
206     return addMaster(new Configuration(conf), this.masterThreads.size());
207   }
208 
209   public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
210   throws IOException {
211     // Create each master with its own Configuration instance so each has
212     // its HConnection instance rather than share (see HBASE_INSTANCES down in
213     // the guts of HConnectionManager.
214 
215     // Also, create separate CoordinatedStateManager instance per Server.
216     // This is special case when we have to have more than 1 CoordinatedStateManager
217     // within 1 process.
218     CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
219 
220     JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
221         (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
222     this.masterThreads.add(mt);
223     return mt;
224   }
225 
226   public JVMClusterUtil.MasterThread addMaster(
227       final Configuration c, final int index, User user)
228   throws IOException, InterruptedException {
229     return user.runAs(
230         new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
231           @Override
232           public JVMClusterUtil.MasterThread run() throws Exception {
233             return addMaster(c, index);
234           }
235         });
236   }
237 
238   /**
239    * @param serverNumber
240    * @return region server
241    */
242   public HRegionServer getRegionServer(int serverNumber) {
243     return regionThreads.get(serverNumber).getRegionServer();
244   }
245 
246   /**
247    * @return Read-only list of region server threads.
248    */
249   public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
250     return Collections.unmodifiableList(this.regionThreads);
251   }
252 
253   /**
254    * @return List of running servers (Some servers may have been killed or
255    * aborted during lifetime of cluster; these servers are not included in this
256    * list).
257    */
258   public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
259     List<JVMClusterUtil.RegionServerThread> liveServers =
260       new ArrayList<JVMClusterUtil.RegionServerThread>();
261     List<RegionServerThread> list = getRegionServers();
262     for (JVMClusterUtil.RegionServerThread rst: list) {
263       if (rst.isAlive()) liveServers.add(rst);
264       else LOG.info("Not alive " + rst.getName());
265     }
266     return liveServers;
267   }
268 
269   /**
270    * @return the Configuration used by this LocalHBaseCluster
271    */
272   public Configuration getConfiguration() {
273     return this.conf;
274   }
275 
276   /**
277    * Wait for the specified region server to stop
278    * Removes this thread from list of running threads.
279    * @param serverNumber
280    * @return Name of region server that just went down.
281    */
282   public String waitOnRegionServer(int serverNumber) {
283     JVMClusterUtil.RegionServerThread regionServerThread =
284       this.regionThreads.remove(serverNumber);
285     while (regionServerThread.isAlive()) {
286       try {
287         LOG.info("Waiting on " +
288           regionServerThread.getRegionServer().toString());
289         regionServerThread.join();
290       } catch (InterruptedException e) {
291         e.printStackTrace();
292       }
293     }
294     return regionServerThread.getName();
295   }
296 
297   /**
298    * Wait for the specified region server to stop
299    * Removes this thread from list of running threads.
300    * @param rst
301    * @return Name of region server that just went down.
302    */
303   public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
304     while (rst.isAlive()) {
305       try {
306         LOG.info("Waiting on " +
307           rst.getRegionServer().toString());
308         rst.join();
309       } catch (InterruptedException e) {
310         e.printStackTrace();
311       }
312     }
313     for (int i=0;i<regionThreads.size();i++) {
314       if (regionThreads.get(i) == rst) {
315         regionThreads.remove(i);
316         break;
317       }
318     }
319     return rst.getName();
320   }
321 
322   /**
323    * @param serverNumber
324    * @return the HMaster thread
325    */
326   public HMaster getMaster(int serverNumber) {
327     return masterThreads.get(serverNumber).getMaster();
328   }
329 
330   /**
331    * Gets the current active master, if available.  If no active master, returns
332    * null.
333    * @return the HMaster for the active master
334    */
335   public HMaster getActiveMaster() {
336     for (JVMClusterUtil.MasterThread mt : masterThreads) {
337       // Ensure that the current active master is not stopped.
338       // We don't want to return a stopping master as an active master.
339       if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
340         return mt.getMaster();
341       }
342     }
343     return null;
344   }
345 
346   /**
347    * @return Read-only list of master threads.
348    */
349   public List<JVMClusterUtil.MasterThread> getMasters() {
350     return Collections.unmodifiableList(this.masterThreads);
351   }
352 
353   /**
354    * @return List of running master servers (Some servers may have been killed
355    * or aborted during lifetime of cluster; these servers are not included in
356    * this list).
357    */
358   public List<JVMClusterUtil.MasterThread> getLiveMasters() {
359     List<JVMClusterUtil.MasterThread> liveServers =
360       new ArrayList<JVMClusterUtil.MasterThread>();
361     List<JVMClusterUtil.MasterThread> list = getMasters();
362     for (JVMClusterUtil.MasterThread mt: list) {
363       if (mt.isAlive()) {
364         liveServers.add(mt);
365       }
366     }
367     return liveServers;
368   }
369 
370   /**
371    * Wait for the specified master to stop
372    * Removes this thread from list of running threads.
373    * @param serverNumber
374    * @return Name of master that just went down.
375    */
376   public String waitOnMaster(int serverNumber) {
377     JVMClusterUtil.MasterThread masterThread = this.masterThreads.remove(serverNumber);
378     while (masterThread.isAlive()) {
379       try {
380         LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString());
381         masterThread.join();
382       } catch (InterruptedException e) {
383         e.printStackTrace();
384       }
385     }
386     return masterThread.getName();
387   }
388 
389   /**
390    * Wait for the specified master to stop
391    * Removes this thread from list of running threads.
392    * @param masterThread
393    * @return Name of master that just went down.
394    */
395   public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
396     while (masterThread.isAlive()) {
397       try {
398         LOG.info("Waiting on " +
399           masterThread.getMaster().getServerName().toString());
400         masterThread.join();
401       } catch (InterruptedException e) {
402         e.printStackTrace();
403       }
404     }
405     for (int i=0;i<masterThreads.size();i++) {
406       if (masterThreads.get(i) == masterThread) {
407         masterThreads.remove(i);
408         break;
409       }
410     }
411     return masterThread.getName();
412   }
413 
414   /**
415    * Wait for Mini HBase Cluster to shut down.
416    * Presumes you've already called {@link #shutdown()}.
417    */
418   public void join() {
419     if (this.regionThreads != null) {
420       for(Thread t: this.regionThreads) {
421         if (t.isAlive()) {
422           try {
423             Threads.threadDumpingIsAlive(t);
424           } catch (InterruptedException e) {
425             LOG.debug("Interrupted", e);
426           }
427         }
428       }
429     }
430     if (this.masterThreads != null) {
431       for (Thread t : this.masterThreads) {
432         if (t.isAlive()) {
433           try {
434             Threads.threadDumpingIsAlive(t);
435           } catch (InterruptedException e) {
436             LOG.debug("Interrupted", e);
437           }
438         }
439       }
440     }
441   }
442 
443   /**
444    * Start the cluster.
445    */
446   public void startup() throws IOException {
447     JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
448   }
449 
450   /**
451    * Shut down the mini HBase cluster
452    */
453   public void shutdown() {
454     JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
455   }
456 
457   /**
458    * @param c Configuration to check.
459    * @return True if a 'local' address in hbase.master value.
460    */
461   public static boolean isLocal(final Configuration c) {
462     boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
463     return(mode == HConstants.CLUSTER_IS_LOCAL);
464   }
465 
466   /**
467    * Test things basically work.
468    * @param args
469    * @throws IOException
470    */
471   public static void main(String[] args) throws IOException {
472     Configuration conf = HBaseConfiguration.create();
473     LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
474     cluster.startup();
475     Admin admin = new HBaseAdmin(conf);
476     try {
477       HTableDescriptor htd =
478         new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
479       admin.createTable(htd);
480     } finally {
481       admin.close();
482     }
483     cluster.shutdown();
484   }
485 }