1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.security.PrivilegedExceptionAction;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.client.Admin;
33 import org.apache.hadoop.hbase.client.HBaseAdmin;
34 import org.apache.hadoop.hbase.regionserver.HRegionServer;
35 import org.apache.hadoop.hbase.security.User;
36 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
37 import org.apache.hadoop.hbase.util.Threads;
38
39 import java.util.concurrent.CopyOnWriteArrayList;
40 import org.apache.hadoop.hbase.master.HMaster;
41 import org.apache.hadoop.hbase.util.JVMClusterUtil;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 @InterfaceAudience.Public
60 @InterfaceStability.Evolving
61 public class LocalHBaseCluster {
62 private static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
63 private final List<JVMClusterUtil.MasterThread> masterThreads =
64 new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
65 private final List<JVMClusterUtil.RegionServerThread> regionThreads =
66 new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
67 private final static int DEFAULT_NO = 1;
68
69 public static final String LOCAL = "local";
70
71 public static final String LOCAL_COLON = LOCAL + ":";
72 private final Configuration conf;
73 private final Class<? extends HMaster> masterClass;
74 private final Class<? extends HRegionServer> regionServerClass;
75
76
77
78
79
80
81 public LocalHBaseCluster(final Configuration conf)
82 throws IOException {
83 this(conf, DEFAULT_NO);
84 }
85
86
87
88
89
90
91
92
93 public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
94 throws IOException {
95 this(conf, 1, noRegionServers, getMasterImplementation(conf),
96 getRegionServerImplementation(conf));
97 }
98
99
100
101
102
103
104
105
106
107 public LocalHBaseCluster(final Configuration conf, final int noMasters,
108 final int noRegionServers)
109 throws IOException {
110 this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
111 getRegionServerImplementation(conf));
112 }
113
114 @SuppressWarnings("unchecked")
115 private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
116 return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
117 HRegionServer.class);
118 }
119
120 @SuppressWarnings("unchecked")
121 private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
122 return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
123 HMaster.class);
124 }
125
126
127
128
129
130
131
132
133
134
135
136 @SuppressWarnings("unchecked")
137 public LocalHBaseCluster(final Configuration conf, final int noMasters,
138 final int noRegionServers, final Class<? extends HMaster> masterClass,
139 final Class<? extends HRegionServer> regionServerClass)
140 throws IOException {
141 this.conf = conf;
142
143
144
145 conf.set(HConstants.MASTER_PORT, "0");
146 conf.set(HConstants.REGIONSERVER_PORT, "0");
147 if (conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 0) != -1) {
148 conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
149 }
150
151 this.masterClass = (Class<? extends HMaster>)
152 conf.getClass(HConstants.MASTER_IMPL, masterClass);
153
154 for (int i = 0; i < noMasters; i++) {
155 addMaster(new Configuration(conf), i);
156 }
157
158 this.regionServerClass =
159 (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
160 regionServerClass);
161
162 for (int i = 0; i < noRegionServers; i++) {
163 addRegionServer(new Configuration(conf), i);
164 }
165 }
166
167 public JVMClusterUtil.RegionServerThread addRegionServer()
168 throws IOException {
169 return addRegionServer(new Configuration(conf), this.regionThreads.size());
170 }
171
172 @SuppressWarnings("unchecked")
173 public JVMClusterUtil.RegionServerThread addRegionServer(
174 Configuration config, final int index)
175 throws IOException {
176
177
178
179
180
181
182
183 CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
184
185 JVMClusterUtil.RegionServerThread rst =
186 JVMClusterUtil.createRegionServerThread(config, cp, (Class<? extends HRegionServer>) conf
187 .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
188
189 this.regionThreads.add(rst);
190 return rst;
191 }
192
193 public JVMClusterUtil.RegionServerThread addRegionServer(
194 final Configuration config, final int index, User user)
195 throws IOException, InterruptedException {
196 return user.runAs(
197 new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
198 @Override
199 public JVMClusterUtil.RegionServerThread run() throws Exception {
200 return addRegionServer(config, index);
201 }
202 });
203 }
204
205 public JVMClusterUtil.MasterThread addMaster() throws IOException {
206 return addMaster(new Configuration(conf), this.masterThreads.size());
207 }
208
209 public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
210 throws IOException {
211
212
213
214
215
216
217
218 CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
219
220 JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
221 (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
222 this.masterThreads.add(mt);
223 return mt;
224 }
225
226 public JVMClusterUtil.MasterThread addMaster(
227 final Configuration c, final int index, User user)
228 throws IOException, InterruptedException {
229 return user.runAs(
230 new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
231 @Override
232 public JVMClusterUtil.MasterThread run() throws Exception {
233 return addMaster(c, index);
234 }
235 });
236 }
237
238
239
240
241
242 public HRegionServer getRegionServer(int serverNumber) {
243 return regionThreads.get(serverNumber).getRegionServer();
244 }
245
246
247
248
249 public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
250 return Collections.unmodifiableList(this.regionThreads);
251 }
252
253
254
255
256
257
258 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
259 List<JVMClusterUtil.RegionServerThread> liveServers =
260 new ArrayList<JVMClusterUtil.RegionServerThread>();
261 List<RegionServerThread> list = getRegionServers();
262 for (JVMClusterUtil.RegionServerThread rst: list) {
263 if (rst.isAlive()) liveServers.add(rst);
264 else LOG.info("Not alive " + rst.getName());
265 }
266 return liveServers;
267 }
268
269
270
271
272 public Configuration getConfiguration() {
273 return this.conf;
274 }
275
276
277
278
279
280
281
282 public String waitOnRegionServer(int serverNumber) {
283 JVMClusterUtil.RegionServerThread regionServerThread =
284 this.regionThreads.remove(serverNumber);
285 while (regionServerThread.isAlive()) {
286 try {
287 LOG.info("Waiting on " +
288 regionServerThread.getRegionServer().toString());
289 regionServerThread.join();
290 } catch (InterruptedException e) {
291 e.printStackTrace();
292 }
293 }
294 return regionServerThread.getName();
295 }
296
297
298
299
300
301
302
303 public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
304 while (rst.isAlive()) {
305 try {
306 LOG.info("Waiting on " +
307 rst.getRegionServer().toString());
308 rst.join();
309 } catch (InterruptedException e) {
310 e.printStackTrace();
311 }
312 }
313 for (int i=0;i<regionThreads.size();i++) {
314 if (regionThreads.get(i) == rst) {
315 regionThreads.remove(i);
316 break;
317 }
318 }
319 return rst.getName();
320 }
321
322
323
324
325
326 public HMaster getMaster(int serverNumber) {
327 return masterThreads.get(serverNumber).getMaster();
328 }
329
330
331
332
333
334
335 public HMaster getActiveMaster() {
336 for (JVMClusterUtil.MasterThread mt : masterThreads) {
337
338
339 if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
340 return mt.getMaster();
341 }
342 }
343 return null;
344 }
345
346
347
348
349 public List<JVMClusterUtil.MasterThread> getMasters() {
350 return Collections.unmodifiableList(this.masterThreads);
351 }
352
353
354
355
356
357
358 public List<JVMClusterUtil.MasterThread> getLiveMasters() {
359 List<JVMClusterUtil.MasterThread> liveServers =
360 new ArrayList<JVMClusterUtil.MasterThread>();
361 List<JVMClusterUtil.MasterThread> list = getMasters();
362 for (JVMClusterUtil.MasterThread mt: list) {
363 if (mt.isAlive()) {
364 liveServers.add(mt);
365 }
366 }
367 return liveServers;
368 }
369
370
371
372
373
374
375
376 public String waitOnMaster(int serverNumber) {
377 JVMClusterUtil.MasterThread masterThread = this.masterThreads.remove(serverNumber);
378 while (masterThread.isAlive()) {
379 try {
380 LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString());
381 masterThread.join();
382 } catch (InterruptedException e) {
383 e.printStackTrace();
384 }
385 }
386 return masterThread.getName();
387 }
388
389
390
391
392
393
394
395 public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
396 while (masterThread.isAlive()) {
397 try {
398 LOG.info("Waiting on " +
399 masterThread.getMaster().getServerName().toString());
400 masterThread.join();
401 } catch (InterruptedException e) {
402 e.printStackTrace();
403 }
404 }
405 for (int i=0;i<masterThreads.size();i++) {
406 if (masterThreads.get(i) == masterThread) {
407 masterThreads.remove(i);
408 break;
409 }
410 }
411 return masterThread.getName();
412 }
413
414
415
416
417
418 public void join() {
419 if (this.regionThreads != null) {
420 for(Thread t: this.regionThreads) {
421 if (t.isAlive()) {
422 try {
423 Threads.threadDumpingIsAlive(t);
424 } catch (InterruptedException e) {
425 LOG.debug("Interrupted", e);
426 }
427 }
428 }
429 }
430 if (this.masterThreads != null) {
431 for (Thread t : this.masterThreads) {
432 if (t.isAlive()) {
433 try {
434 Threads.threadDumpingIsAlive(t);
435 } catch (InterruptedException e) {
436 LOG.debug("Interrupted", e);
437 }
438 }
439 }
440 }
441 }
442
443
444
445
446 public void startup() throws IOException {
447 JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
448 }
449
450
451
452
453 public void shutdown() {
454 JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
455 }
456
457
458
459
460
461 public static boolean isLocal(final Configuration c) {
462 boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
463 return(mode == HConstants.CLUSTER_IS_LOCAL);
464 }
465
466
467
468
469
470
471 public static void main(String[] args) throws IOException {
472 Configuration conf = HBaseConfiguration.create();
473 LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
474 cluster.startup();
475 Admin admin = new HBaseAdmin(conf);
476 try {
477 HTableDescriptor htd =
478 new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
479 admin.createTable(htd);
480 } finally {
481 admin.close();
482 }
483 cluster.shutdown();
484 }
485 }