View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.security.PrivilegedExceptionAction;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.client.Admin;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.regionserver.HRegionServer;
36  import org.apache.hadoop.hbase.security.User;
37  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
38  import org.apache.hadoop.hbase.util.Threads;
39  
40  import java.util.concurrent.CopyOnWriteArrayList;
41  import org.apache.hadoop.hbase.master.HMaster;
42  import org.apache.hadoop.hbase.util.JVMClusterUtil;
43  
44  /**
45   * This class creates a single process HBase cluster. One thread is created for
46   * a master and one per region server.
47   *
48   * Call {@link #startup()} to start the cluster running and {@link #shutdown()}
49   * to close it all down. {@link #join} the cluster is you want to wait on
50   * shutdown completion.
51   *
52   * <p>Runs master on port 16000 by default.  Because we can't just kill the
53   * process -- not till HADOOP-1700 gets fixed and even then.... -- we need to
54   * be able to find the master with a remote client to run shutdown.  To use a
55   * port other than 16000, set the hbase.master to a value of 'local:PORT':
56   * that is 'local', not 'localhost', and the port number the master should use
57   * instead of 16000.
58   *
59   */
60  @InterfaceAudience.Public
61  @InterfaceStability.Evolving
62  public class LocalHBaseCluster {
63    static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
64    private final List<JVMClusterUtil.MasterThread> masterThreads =
65      new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
66    private final List<JVMClusterUtil.RegionServerThread> regionThreads =
67      new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
68    private final static int DEFAULT_NO = 1;
69    /** local mode */
70    public static final String LOCAL = "local";
71    /** 'local:' */
72    public static final String LOCAL_COLON = LOCAL + ":";
73    private final Configuration conf;
74    private final Class<? extends HMaster> masterClass;
75    private final Class<? extends HRegionServer> regionServerClass;
76  
77    /**
78     * Constructor.
79     * @param conf
80     * @throws IOException
81     */
82    public LocalHBaseCluster(final Configuration conf)
83    throws IOException {
84      this(conf, DEFAULT_NO);
85    }
86  
87    /**
88     * Constructor.
89     * @param conf Configuration to use.  Post construction has the master's
90     * address.
91     * @param noRegionServers Count of regionservers to start.
92     * @throws IOException
93     */
94    public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
95    throws IOException {
96      this(conf, 1, noRegionServers, getMasterImplementation(conf),
97          getRegionServerImplementation(conf));
98    }
99  
100   /**
101    * Constructor.
102    * @param conf Configuration to use.  Post construction has the active master
103    * address.
104    * @param noMasters Count of masters to start.
105    * @param noRegionServers Count of regionservers to start.
106    * @throws IOException
107    */
108   public LocalHBaseCluster(final Configuration conf, final int noMasters,
109       final int noRegionServers)
110   throws IOException {
111     this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
112         getRegionServerImplementation(conf));
113   }
114 
115   @SuppressWarnings("unchecked")
116   private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
117     return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
118        HRegionServer.class);
119   }
120 
121   @SuppressWarnings("unchecked")
122   private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
123     return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
124        HMaster.class);
125   }
126 
127   /**
128    * Constructor.
129    * @param conf Configuration to use.  Post construction has the master's
130    * address.
131    * @param noMasters Count of masters to start.
132    * @param noRegionServers Count of regionservers to start.
133    * @param masterClass
134    * @param regionServerClass
135    * @throws IOException
136    */
137   @SuppressWarnings("unchecked")
138   public LocalHBaseCluster(final Configuration conf, final int noMasters,
139     final int noRegionServers, final Class<? extends HMaster> masterClass,
140     final Class<? extends HRegionServer> regionServerClass)
141   throws IOException {
142     this.conf = conf;
143 
144     // Always have masters and regionservers come up on port '0' so we don't
145     // clash over default ports.
146     conf.set(HConstants.MASTER_PORT, "0");
147     conf.set(HConstants.REGIONSERVER_PORT, "0");
148     conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
149 
150     this.masterClass = (Class<? extends HMaster>)
151       conf.getClass(HConstants.MASTER_IMPL, masterClass);
152     // Start the HMasters.
153     for (int i = 0; i < noMasters; i++) {
154       addMaster(new Configuration(conf), i);
155     }
156     // Start the HRegionServers.
157     this.regionServerClass =
158       (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
159        regionServerClass);
160 
161     for (int i = 0; i < noRegionServers; i++) {
162       addRegionServer(new Configuration(conf), i);
163     }
164   }
165 
166   public JVMClusterUtil.RegionServerThread addRegionServer()
167       throws IOException {
168     return addRegionServer(new Configuration(conf), this.regionThreads.size());
169   }
170 
171   @SuppressWarnings("unchecked")
172   public JVMClusterUtil.RegionServerThread addRegionServer(
173       Configuration config, final int index)
174   throws IOException {
175     // Create each regionserver with its own Configuration instance so each has
176     // its HConnection instance rather than share (see HBASE_INSTANCES down in
177     // the guts of ConnectionManager).
178 
179     // Also, create separate CoordinatedStateManager instance per Server.
180     // This is special case when we have to have more than 1 CoordinatedStateManager
181     // within 1 process.
182     CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
183 
184     JVMClusterUtil.RegionServerThread rst =
185         JVMClusterUtil.createRegionServerThread(config, cp, (Class<? extends HRegionServer>) conf
186             .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
187 
188     this.regionThreads.add(rst);
189     return rst;
190   }
191 
192   public JVMClusterUtil.RegionServerThread addRegionServer(
193       final Configuration config, final int index, User user)
194   throws IOException, InterruptedException {
195     return user.runAs(
196         new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
197           public JVMClusterUtil.RegionServerThread run() throws Exception {
198             return addRegionServer(config, index);
199           }
200         });
201   }
202 
203   public JVMClusterUtil.MasterThread addMaster() throws IOException {
204     return addMaster(new Configuration(conf), this.masterThreads.size());
205   }
206 
207   public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
208   throws IOException {
209     // Create each master with its own Configuration instance so each has
210     // its HConnection instance rather than share (see HBASE_INSTANCES down in
211     // the guts of ConnectionManager.
212 
213     // Also, create separate CoordinatedStateManager instance per Server.
214     // This is special case when we have to have more than 1 CoordinatedStateManager
215     // within 1 process.
216     CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
217 
218     JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
219         (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
220     this.masterThreads.add(mt);
221     return mt;
222   }
223 
224   public JVMClusterUtil.MasterThread addMaster(
225       final Configuration c, final int index, User user)
226   throws IOException, InterruptedException {
227     return user.runAs(
228         new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
229           public JVMClusterUtil.MasterThread run() throws Exception {
230             return addMaster(c, index);
231           }
232         });
233   }
234 
235   /**
236    * @param serverNumber
237    * @return region server
238    */
239   public HRegionServer getRegionServer(int serverNumber) {
240     return regionThreads.get(serverNumber).getRegionServer();
241   }
242 
243   /**
244    * @return Read-only list of region server threads.
245    */
246   public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
247     return Collections.unmodifiableList(this.regionThreads);
248   }
249 
250   /**
251    * @return List of running servers (Some servers may have been killed or
252    * aborted during lifetime of cluster; these servers are not included in this
253    * list).
254    */
255   public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
256     List<JVMClusterUtil.RegionServerThread> liveServers =
257       new ArrayList<JVMClusterUtil.RegionServerThread>();
258     List<RegionServerThread> list = getRegionServers();
259     for (JVMClusterUtil.RegionServerThread rst: list) {
260       if (rst.isAlive()) liveServers.add(rst);
261       else LOG.info("Not alive " + rst.getName());
262     }
263     return liveServers;
264   }
265 
266   /**
267    * @return the Configuration used by this LocalHBaseCluster
268    */
269   public Configuration getConfiguration() {
270     return this.conf;
271   }
272 
273   /**
274    * Wait for the specified region server to stop
275    * Removes this thread from list of running threads.
276    * @param serverNumber
277    * @return Name of region server that just went down.
278    */
279   public String waitOnRegionServer(int serverNumber) {
280     JVMClusterUtil.RegionServerThread regionServerThread =
281       this.regionThreads.remove(serverNumber);
282     while (regionServerThread.isAlive()) {
283       try {
284         LOG.info("Waiting on " +
285           regionServerThread.getRegionServer().toString());
286         regionServerThread.join();
287       } catch (InterruptedException e) {
288         e.printStackTrace();
289       }
290     }
291     return regionServerThread.getName();
292   }
293 
294   /**
295    * Wait for the specified region server to stop
296    * Removes this thread from list of running threads.
297    * @param rst
298    * @return Name of region server that just went down.
299    */
300   public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
301     while (rst.isAlive()) {
302       try {
303         LOG.info("Waiting on " +
304           rst.getRegionServer().toString());
305         rst.join();
306       } catch (InterruptedException e) {
307         e.printStackTrace();
308       }
309     }
310     for (int i=0;i<regionThreads.size();i++) {
311       if (regionThreads.get(i) == rst) {
312         regionThreads.remove(i);
313         break;
314       }
315     }
316     return rst.getName();
317   }
318 
319   /**
320    * @param serverNumber
321    * @return the HMaster thread
322    */
323   public HMaster getMaster(int serverNumber) {
324     return masterThreads.get(serverNumber).getMaster();
325   }
326 
327   /**
328    * Gets the current active master, if available.  If no active master, returns
329    * null.
330    * @return the HMaster for the active master
331    */
332   public HMaster getActiveMaster() {
333     for (JVMClusterUtil.MasterThread mt : masterThreads) {
334       if (mt.getMaster().isActiveMaster()) {
335         // Ensure that the current active master is not stopped.
336         // We don't want to return a stopping master as an active master.
337         if (mt.getMaster().isActiveMaster()  && !mt.getMaster().isStopped()) {
338           return mt.getMaster();
339         }
340       }
341     }
342     return null;
343   }
344 
345   /**
346    * @return Read-only list of master threads.
347    */
348   public List<JVMClusterUtil.MasterThread> getMasters() {
349     return Collections.unmodifiableList(this.masterThreads);
350   }
351 
352   /**
353    * @return List of running master servers (Some servers may have been killed
354    * or aborted during lifetime of cluster; these servers are not included in
355    * this list).
356    */
357   public List<JVMClusterUtil.MasterThread> getLiveMasters() {
358     List<JVMClusterUtil.MasterThread> liveServers =
359       new ArrayList<JVMClusterUtil.MasterThread>();
360     List<JVMClusterUtil.MasterThread> list = getMasters();
361     for (JVMClusterUtil.MasterThread mt: list) {
362       if (mt.isAlive()) {
363         liveServers.add(mt);
364       }
365     }
366     return liveServers;
367   }
368 
369   /**
370    * Wait for the specified master to stop
371    * Removes this thread from list of running threads.
372    * @param serverNumber
373    * @return Name of master that just went down.
374    */
375   public String waitOnMaster(int serverNumber) {
376     JVMClusterUtil.MasterThread masterThread = this.masterThreads.remove(serverNumber);
377     while (masterThread.isAlive()) {
378       try {
379         LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString());
380         masterThread.join();
381       } catch (InterruptedException e) {
382         e.printStackTrace();
383       }
384     }
385     return masterThread.getName();
386   }
387 
388   /**
389    * Wait for the specified master to stop
390    * Removes this thread from list of running threads.
391    * @param masterThread
392    * @return Name of master that just went down.
393    */
394   public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
395     while (masterThread.isAlive()) {
396       try {
397         LOG.info("Waiting on " +
398           masterThread.getMaster().getServerName().toString());
399         masterThread.join();
400       } catch (InterruptedException e) {
401         e.printStackTrace();
402       }
403     }
404     for (int i=0;i<masterThreads.size();i++) {
405       if (masterThreads.get(i) == masterThread) {
406         masterThreads.remove(i);
407         break;
408       }
409     }
410     return masterThread.getName();
411   }
412 
413   /**
414    * Wait for Mini HBase Cluster to shut down.
415    * Presumes you've already called {@link #shutdown()}.
416    */
417   public void join() {
418     if (this.regionThreads != null) {
419       for(Thread t: this.regionThreads) {
420         if (t.isAlive()) {
421           try {
422             Threads.threadDumpingIsAlive(t);
423           } catch (InterruptedException e) {
424             LOG.debug("Interrupted", e);
425           }
426         }
427       }
428     }
429     if (this.masterThreads != null) {
430       for (Thread t : this.masterThreads) {
431         if (t.isAlive()) {
432           try {
433             Threads.threadDumpingIsAlive(t);
434           } catch (InterruptedException e) {
435             LOG.debug("Interrupted", e);
436           }
437         }
438       }
439     }
440   }
441 
442   /**
443    * Start the cluster.
444    */
445   public void startup() throws IOException {
446     JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
447   }
448 
449   /**
450    * Shut down the mini HBase cluster
451    */
452   public void shutdown() {
453     JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
454   }
455 
456   /**
457    * @param c Configuration to check.
458    * @return True if a 'local' address in hbase.master value.
459    */
460   public static boolean isLocal(final Configuration c) {
461     boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
462     return(mode == HConstants.CLUSTER_IS_LOCAL);
463   }
464 
465   /**
466    * Test things basically work.
467    * @param args
468    * @throws IOException
469    */
470   public static void main(String[] args) throws IOException {
471     Configuration conf = HBaseConfiguration.create();
472     LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
473     cluster.startup();
474     Connection connection = ConnectionFactory.createConnection(conf);
475     Admin admin = connection.getAdmin();
476     try {
477       HTableDescriptor htd =
478         new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
479       admin.createTable(htd);
480     } finally {
481       admin.close();
482     }
483     connection.close();
484     cluster.shutdown();
485   }
486 }