001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.security.PrivilegedExceptionAction; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.regionserver.HRegionServer; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 037import org.apache.hadoop.hbase.util.Threads; 038 039import java.util.concurrent.CopyOnWriteArrayList; 040import org.apache.hadoop.hbase.master.HMaster; 041import org.apache.hadoop.hbase.util.JVMClusterUtil; 042 043/** 044 * This class creates a single process HBase cluster. One thread is created for 045 * a master and one per region server. 046 * 047 * Call {@link #startup()} to start the cluster running and {@link #shutdown()} 048 * to close it all down. {@link #join} the cluster is you want to wait on 049 * shutdown completion. 050 * 051 * <p>Runs master on port 16000 by default. Because we can't just kill the 052 * process -- not till HADOOP-1700 gets fixed and even then.... -- we need to 053 * be able to find the master with a remote client to run shutdown. To use a 054 * port other than 16000, set the hbase.master to a value of 'local:PORT': 055 * that is 'local', not 'localhost', and the port number the master should use 056 * instead of 16000. 057 * 058 */ 059@InterfaceAudience.Public 060public class LocalHBaseCluster { 061 private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class); 062 private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>(); 063 private final List<JVMClusterUtil.RegionServerThread> regionThreads = new CopyOnWriteArrayList<>(); 064 private final static int DEFAULT_NO = 1; 065 /** local mode */ 066 public static final String LOCAL = "local"; 067 /** 'local:' */ 068 public static final String LOCAL_COLON = LOCAL + ":"; 069 public static final String ASSIGN_RANDOM_PORTS = "hbase.localcluster.assign.random.ports"; 070 071 private final Configuration conf; 072 private final Class<? extends HMaster> masterClass; 073 private final Class<? extends HRegionServer> regionServerClass; 074 075 /** 076 * Constructor. 077 * @param conf 078 * @throws IOException 079 */ 080 public LocalHBaseCluster(final Configuration conf) 081 throws IOException { 082 this(conf, DEFAULT_NO); 083 } 084 085 /** 086 * Constructor. 087 * @param conf Configuration to use. Post construction has the master's 088 * address. 089 * @param noRegionServers Count of regionservers to start. 090 * @throws IOException 091 */ 092 public LocalHBaseCluster(final Configuration conf, final int noRegionServers) 093 throws IOException { 094 this(conf, 1, noRegionServers, getMasterImplementation(conf), 095 getRegionServerImplementation(conf)); 096 } 097 098 /** 099 * Constructor. 100 * @param conf Configuration to use. Post construction has the active master 101 * address. 102 * @param noMasters Count of masters to start. 103 * @param noRegionServers Count of regionservers to start. 104 * @throws IOException 105 */ 106 public LocalHBaseCluster(final Configuration conf, final int noMasters, 107 final int noRegionServers) 108 throws IOException { 109 this(conf, noMasters, noRegionServers, getMasterImplementation(conf), 110 getRegionServerImplementation(conf)); 111 } 112 113 @SuppressWarnings("unchecked") 114 private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) { 115 return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL, 116 HRegionServer.class); 117 } 118 119 @SuppressWarnings("unchecked") 120 private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) { 121 return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL, 122 HMaster.class); 123 } 124 125 /** 126 * Constructor. 127 * @param conf Configuration to use. Post construction has the master's 128 * address. 129 * @param noMasters Count of masters to start. 130 * @param noRegionServers Count of regionservers to start. 131 * @param masterClass 132 * @param regionServerClass 133 * @throws IOException 134 */ 135 @SuppressWarnings("unchecked") 136 public LocalHBaseCluster(final Configuration conf, final int noMasters, 137 final int noRegionServers, final Class<? extends HMaster> masterClass, 138 final Class<? extends HRegionServer> regionServerClass) 139 throws IOException { 140 this.conf = conf; 141 142 // When active, if a port selection is default then we switch to random 143 if (conf.getBoolean(ASSIGN_RANDOM_PORTS, false)) { 144 if (conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT) 145 == HConstants.DEFAULT_MASTER_PORT) { 146 LOG.debug("Setting Master Port to random."); 147 conf.set(HConstants.MASTER_PORT, "0"); 148 } 149 if (conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT) 150 == HConstants.DEFAULT_REGIONSERVER_PORT) { 151 LOG.debug("Setting RegionServer Port to random."); 152 conf.set(HConstants.REGIONSERVER_PORT, "0"); 153 } 154 // treat info ports special; expressly don't change '-1' (keep off) 155 // in case we make that the default behavior. 156 if (conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 0) != -1 && 157 conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT) 158 == HConstants.DEFAULT_REGIONSERVER_INFOPORT) { 159 LOG.debug("Setting RS InfoServer Port to random."); 160 conf.set(HConstants.REGIONSERVER_INFO_PORT, "0"); 161 } 162 if (conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1 && 163 conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT) 164 == HConstants.DEFAULT_MASTER_INFOPORT) { 165 LOG.debug("Setting Master InfoServer Port to random."); 166 conf.set(HConstants.MASTER_INFO_PORT, "0"); 167 } 168 } 169 170 this.masterClass = (Class<? extends HMaster>) 171 conf.getClass(HConstants.MASTER_IMPL, masterClass); 172 // Start the HMasters. 173 for (int i = 0; i < noMasters; i++) { 174 addMaster(new Configuration(conf), i); 175 } 176 // Start the HRegionServers. 177 this.regionServerClass = 178 (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL, 179 regionServerClass); 180 181 for (int i = 0; i < noRegionServers; i++) { 182 addRegionServer(new Configuration(conf), i); 183 } 184 } 185 186 public JVMClusterUtil.RegionServerThread addRegionServer() 187 throws IOException { 188 return addRegionServer(new Configuration(conf), this.regionThreads.size()); 189 } 190 191 @SuppressWarnings("unchecked") 192 public JVMClusterUtil.RegionServerThread addRegionServer( 193 Configuration config, final int index) 194 throws IOException { 195 // Create each regionserver with its own Configuration instance so each has 196 // its Connection instance rather than share (see HBASE_INSTANCES down in 197 // the guts of ConnectionManager). 198 JVMClusterUtil.RegionServerThread rst = 199 JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf 200 .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index); 201 202 this.regionThreads.add(rst); 203 return rst; 204 } 205 206 public JVMClusterUtil.RegionServerThread addRegionServer( 207 final Configuration config, final int index, User user) 208 throws IOException, InterruptedException { 209 return user.runAs( 210 new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() { 211 @Override 212 public JVMClusterUtil.RegionServerThread run() throws Exception { 213 return addRegionServer(config, index); 214 } 215 }); 216 } 217 218 public JVMClusterUtil.MasterThread addMaster() throws IOException { 219 return addMaster(new Configuration(conf), this.masterThreads.size()); 220 } 221 222 public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index) 223 throws IOException { 224 // Create each master with its own Configuration instance so each has 225 // its Connection instance rather than share (see HBASE_INSTANCES down in 226 // the guts of ConnectionManager. 227 JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, 228 (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index); 229 this.masterThreads.add(mt); 230 return mt; 231 } 232 233 public JVMClusterUtil.MasterThread addMaster( 234 final Configuration c, final int index, User user) 235 throws IOException, InterruptedException { 236 return user.runAs( 237 new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() { 238 @Override 239 public JVMClusterUtil.MasterThread run() throws Exception { 240 return addMaster(c, index); 241 } 242 }); 243 } 244 245 /** 246 * @param serverNumber 247 * @return region server 248 */ 249 public HRegionServer getRegionServer(int serverNumber) { 250 return regionThreads.get(serverNumber).getRegionServer(); 251 } 252 253 /** 254 * @return Read-only list of region server threads. 255 */ 256 public List<JVMClusterUtil.RegionServerThread> getRegionServers() { 257 return Collections.unmodifiableList(this.regionThreads); 258 } 259 260 /** 261 * @return List of running servers (Some servers may have been killed or 262 * aborted during lifetime of cluster; these servers are not included in this 263 * list). 264 */ 265 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() { 266 List<JVMClusterUtil.RegionServerThread> liveServers = new ArrayList<>(); 267 List<RegionServerThread> list = getRegionServers(); 268 for (JVMClusterUtil.RegionServerThread rst: list) { 269 if (rst.isAlive()) liveServers.add(rst); 270 else LOG.info("Not alive " + rst.getName()); 271 } 272 return liveServers; 273 } 274 275 /** 276 * @return the Configuration used by this LocalHBaseCluster 277 */ 278 public Configuration getConfiguration() { 279 return this.conf; 280 } 281 282 /** 283 * Wait for the specified region server to stop. Removes this thread from list of running threads. 284 * @return Name of region server that just went down. 285 */ 286 public String waitOnRegionServer(int serverNumber) { 287 JVMClusterUtil.RegionServerThread regionServerThread = this.regionThreads.get(serverNumber); 288 return waitOnRegionServer(regionServerThread); 289 } 290 291 /** 292 * Wait for the specified region server to stop. Removes this thread from list of running threads. 293 * @return Name of region server that just went down. 294 */ 295 public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) { 296 while (rst.isAlive()) { 297 try { 298 LOG.info("Waiting on " + rst.getRegionServer().toString()); 299 rst.join(); 300 } catch (InterruptedException e) { 301 e.printStackTrace(); 302 } 303 } 304 regionThreads.remove(rst); 305 return rst.getName(); 306 } 307 308 /** 309 * @return the HMaster thread 310 */ 311 public HMaster getMaster(int serverNumber) { 312 return masterThreads.get(serverNumber).getMaster(); 313 } 314 315 /** 316 * Gets the current active master, if available. If no active master, returns 317 * null. 318 * @return the HMaster for the active master 319 */ 320 public HMaster getActiveMaster() { 321 for (JVMClusterUtil.MasterThread mt : masterThreads) { 322 // Ensure that the current active master is not stopped. 323 // We don't want to return a stopping master as an active master. 324 if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) { 325 return mt.getMaster(); 326 } 327 } 328 return null; 329 } 330 331 /** 332 * @return Read-only list of master threads. 333 */ 334 public List<JVMClusterUtil.MasterThread> getMasters() { 335 return Collections.unmodifiableList(this.masterThreads); 336 } 337 338 /** 339 * @return List of running master servers (Some servers may have been killed 340 * or aborted during lifetime of cluster; these servers are not included in 341 * this list). 342 */ 343 public List<JVMClusterUtil.MasterThread> getLiveMasters() { 344 List<JVMClusterUtil.MasterThread> liveServers = new ArrayList<>(); 345 List<JVMClusterUtil.MasterThread> list = getMasters(); 346 for (JVMClusterUtil.MasterThread mt: list) { 347 if (mt.isAlive()) { 348 liveServers.add(mt); 349 } 350 } 351 return liveServers; 352 } 353 354 /** 355 * Wait for the specified master to stop. Removes this thread from list of running threads. 356 * @return Name of master that just went down. 357 */ 358 public String waitOnMaster(int serverNumber) { 359 JVMClusterUtil.MasterThread masterThread = this.masterThreads.get(serverNumber); 360 return waitOnMaster(masterThread); 361 } 362 363 /** 364 * Wait for the specified master to stop. Removes this thread from list of running threads. 365 * @return Name of master that just went down. 366 */ 367 public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) { 368 while (masterThread.isAlive()) { 369 try { 370 LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString()); 371 masterThread.join(); 372 } catch (InterruptedException e) { 373 e.printStackTrace(); 374 } 375 } 376 masterThreads.remove(masterThread); 377 return masterThread.getName(); 378 } 379 380 /** 381 * Wait for Mini HBase Cluster to shut down. 382 * Presumes you've already called {@link #shutdown()}. 383 */ 384 public void join() { 385 if (this.regionThreads != null) { 386 for(Thread t: this.regionThreads) { 387 if (t.isAlive()) { 388 try { 389 Threads.threadDumpingIsAlive(t); 390 } catch (InterruptedException e) { 391 LOG.debug("Interrupted", e); 392 } 393 } 394 } 395 } 396 if (this.masterThreads != null) { 397 for (Thread t : this.masterThreads) { 398 if (t.isAlive()) { 399 try { 400 Threads.threadDumpingIsAlive(t); 401 } catch (InterruptedException e) { 402 LOG.debug("Interrupted", e); 403 } 404 } 405 } 406 } 407 } 408 409 /** 410 * Start the cluster. 411 */ 412 public void startup() throws IOException { 413 JVMClusterUtil.startup(this.masterThreads, this.regionThreads); 414 } 415 416 /** 417 * Shut down the mini HBase cluster 418 */ 419 public void shutdown() { 420 JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads); 421 } 422 423 /** 424 * @param c Configuration to check. 425 * @return True if a 'local' address in hbase.master value. 426 */ 427 public static boolean isLocal(final Configuration c) { 428 boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); 429 return(mode == HConstants.CLUSTER_IS_LOCAL); 430 } 431 432 /** 433 * Test things basically work. 434 * @param args 435 * @throws IOException 436 */ 437 public static void main(String[] args) throws IOException { 438 Configuration conf = HBaseConfiguration.create(); 439 LocalHBaseCluster cluster = new LocalHBaseCluster(conf); 440 cluster.startup(); 441 Connection connection = ConnectionFactory.createConnection(conf); 442 Admin admin = connection.getAdmin(); 443 try { 444 HTableDescriptor htd = 445 new HTableDescriptor(TableName.valueOf(cluster.getClass().getName())); 446 admin.createTable(htd); 447 } finally { 448 admin.close(); 449 } 450 connection.close(); 451 cluster.shutdown(); 452 } 453}