001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.security.PrivilegedExceptionAction; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.regionserver.HRegionServer; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 037import org.apache.hadoop.hbase.util.Threads; 038 039import java.util.concurrent.CopyOnWriteArrayList; 040import org.apache.hadoop.hbase.master.HMaster; 041import org.apache.hadoop.hbase.util.JVMClusterUtil; 042 043/** 044 * This class creates a single process HBase cluster. One thread is created for 045 * a master and one per region server. 046 * 047 * Call {@link #startup()} to start the cluster running and {@link #shutdown()} 048 * to close it all down. {@link #join} the cluster is you want to wait on 049 * shutdown completion. 050 * 051 * <p>Runs master on port 16000 by default. Because we can't just kill the 052 * process -- not till HADOOP-1700 gets fixed and even then.... -- we need to 053 * be able to find the master with a remote client to run shutdown. To use a 054 * port other than 16000, set the hbase.master to a value of 'local:PORT': 055 * that is 'local', not 'localhost', and the port number the master should use 056 * instead of 16000. 057 * 058 */ 059@InterfaceAudience.Public 060public class LocalHBaseCluster { 061 private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class); 062 private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>(); 063 private final List<JVMClusterUtil.RegionServerThread> regionThreads = new CopyOnWriteArrayList<>(); 064 private final static int DEFAULT_NO = 1; 065 /** local mode */ 066 public static final String LOCAL = "local"; 067 /** 'local:' */ 068 public static final String LOCAL_COLON = LOCAL + ":"; 069 public static final String ASSIGN_RANDOM_PORTS = "hbase.localcluster.assign.random.ports"; 070 071 private final Configuration conf; 072 private final Class<? extends HMaster> masterClass; 073 private final Class<? extends HRegionServer> regionServerClass; 074 075 /** 076 * Constructor. 077 * @param conf 078 * @throws IOException 079 */ 080 public LocalHBaseCluster(final Configuration conf) 081 throws IOException { 082 this(conf, DEFAULT_NO); 083 } 084 085 /** 086 * Constructor. 087 * @param conf Configuration to use. Post construction has the master's 088 * address. 089 * @param noRegionServers Count of regionservers to start. 090 * @throws IOException 091 */ 092 public LocalHBaseCluster(final Configuration conf, final int noRegionServers) 093 throws IOException { 094 this(conf, 1, 0, noRegionServers, getMasterImplementation(conf), 095 getRegionServerImplementation(conf)); 096 } 097 098 /** 099 * Constructor. 100 * @param conf Configuration to use. Post construction has the active master 101 * address. 102 * @param noMasters Count of masters to start. 103 * @param noRegionServers Count of regionservers to start. 104 * @throws IOException 105 */ 106 public LocalHBaseCluster(final Configuration conf, final int noMasters, 107 final int noRegionServers) 108 throws IOException { 109 this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf), 110 getRegionServerImplementation(conf)); 111 } 112 113 @SuppressWarnings("unchecked") 114 private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) { 115 return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL, 116 HRegionServer.class); 117 } 118 119 @SuppressWarnings("unchecked") 120 private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) { 121 return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL, 122 HMaster.class); 123 } 124 125 public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers, 126 final Class<? extends HMaster> masterClass, 127 final Class<? extends HRegionServer> regionServerClass) throws IOException { 128 this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass); 129 } 130 131 /** 132 * Constructor. 133 * @param conf Configuration to use. Post construction has the master's 134 * address. 135 * @param noMasters Count of masters to start. 136 * @param noRegionServers Count of regionservers to start. 137 * @param masterClass 138 * @param regionServerClass 139 * @throws IOException 140 */ 141 @SuppressWarnings("unchecked") 142 public LocalHBaseCluster(final Configuration conf, final int noMasters, 143 final int noAlwaysStandByMasters, final int noRegionServers, 144 final Class<? extends HMaster> masterClass, 145 final Class<? extends HRegionServer> regionServerClass) throws IOException { 146 this.conf = conf; 147 148 // When active, if a port selection is default then we switch to random 149 if (conf.getBoolean(ASSIGN_RANDOM_PORTS, false)) { 150 if (conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT) 151 == HConstants.DEFAULT_MASTER_PORT) { 152 LOG.debug("Setting Master Port to random."); 153 conf.set(HConstants.MASTER_PORT, "0"); 154 } 155 if (conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT) 156 == HConstants.DEFAULT_REGIONSERVER_PORT) { 157 LOG.debug("Setting RegionServer Port to random."); 158 conf.set(HConstants.REGIONSERVER_PORT, "0"); 159 } 160 // treat info ports special; expressly don't change '-1' (keep off) 161 // in case we make that the default behavior. 162 if (conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 0) != -1 && 163 conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT) 164 == HConstants.DEFAULT_REGIONSERVER_INFOPORT) { 165 LOG.debug("Setting RS InfoServer Port to random."); 166 conf.set(HConstants.REGIONSERVER_INFO_PORT, "0"); 167 } 168 if (conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1 && 169 conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT) 170 == HConstants.DEFAULT_MASTER_INFOPORT) { 171 LOG.debug("Setting Master InfoServer Port to random."); 172 conf.set(HConstants.MASTER_INFO_PORT, "0"); 173 } 174 } 175 176 this.masterClass = (Class<? extends HMaster>) 177 conf.getClass(HConstants.MASTER_IMPL, masterClass); 178 // Start the HMasters. 179 int i; 180 for (i = 0; i < noMasters; i++) { 181 addMaster(new Configuration(conf), i); 182 } 183 for (int j = 0; j < noAlwaysStandByMasters; j++) { 184 Configuration c = new Configuration(conf); 185 c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster"); 186 addMaster(c, i + j); 187 } 188 // Start the HRegionServers. 189 this.regionServerClass = 190 (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL, 191 regionServerClass); 192 193 for (int j = 0; j < noRegionServers; j++) { 194 addRegionServer(new Configuration(conf), j); 195 } 196 } 197 198 public JVMClusterUtil.RegionServerThread addRegionServer() 199 throws IOException { 200 return addRegionServer(new Configuration(conf), this.regionThreads.size()); 201 } 202 203 @SuppressWarnings("unchecked") 204 public JVMClusterUtil.RegionServerThread addRegionServer( 205 Configuration config, final int index) 206 throws IOException { 207 // Create each regionserver with its own Configuration instance so each has 208 // its Connection instance rather than share (see HBASE_INSTANCES down in 209 // the guts of ConnectionManager). 210 JVMClusterUtil.RegionServerThread rst = 211 JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf 212 .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index); 213 214 this.regionThreads.add(rst); 215 return rst; 216 } 217 218 public JVMClusterUtil.RegionServerThread addRegionServer( 219 final Configuration config, final int index, User user) 220 throws IOException, InterruptedException { 221 return user.runAs( 222 new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() { 223 @Override 224 public JVMClusterUtil.RegionServerThread run() throws Exception { 225 return addRegionServer(config, index); 226 } 227 }); 228 } 229 230 public JVMClusterUtil.MasterThread addMaster() throws IOException { 231 return addMaster(new Configuration(conf), this.masterThreads.size()); 232 } 233 234 public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index) 235 throws IOException { 236 // Create each master with its own Configuration instance so each has 237 // its Connection instance rather than share (see HBASE_INSTANCES down in 238 // the guts of ConnectionManager. 239 JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, 240 (Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index); 241 this.masterThreads.add(mt); 242 // Refresh the master address config. 243 List<String> masterHostPorts = new ArrayList<>(); 244 getMasters().forEach(masterThread -> 245 masterHostPorts.add(masterThread.getMaster().getServerName().getAddress().toString())); 246 conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts)); 247 return mt; 248 } 249 250 public JVMClusterUtil.MasterThread addMaster( 251 final Configuration c, final int index, User user) 252 throws IOException, InterruptedException { 253 return user.runAs( 254 new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() { 255 @Override 256 public JVMClusterUtil.MasterThread run() throws Exception { 257 return addMaster(c, index); 258 } 259 }); 260 } 261 262 /** 263 * @param serverNumber 264 * @return region server 265 */ 266 public HRegionServer getRegionServer(int serverNumber) { 267 return regionThreads.get(serverNumber).getRegionServer(); 268 } 269 270 /** 271 * @return Read-only list of region server threads. 272 */ 273 public List<JVMClusterUtil.RegionServerThread> getRegionServers() { 274 return Collections.unmodifiableList(this.regionThreads); 275 } 276 277 /** 278 * @return List of running servers (Some servers may have been killed or 279 * aborted during lifetime of cluster; these servers are not included in this 280 * list). 281 */ 282 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() { 283 List<JVMClusterUtil.RegionServerThread> liveServers = new ArrayList<>(); 284 List<RegionServerThread> list = getRegionServers(); 285 for (JVMClusterUtil.RegionServerThread rst: list) { 286 if (rst.isAlive()) liveServers.add(rst); 287 else LOG.info("Not alive " + rst.getName()); 288 } 289 return liveServers; 290 } 291 292 /** 293 * @return the Configuration used by this LocalHBaseCluster 294 */ 295 public Configuration getConfiguration() { 296 return this.conf; 297 } 298 299 /** 300 * Wait for the specified region server to stop. Removes this thread from list of running threads. 301 * @return Name of region server that just went down. 302 */ 303 public String waitOnRegionServer(int serverNumber) { 304 JVMClusterUtil.RegionServerThread regionServerThread = this.regionThreads.get(serverNumber); 305 return waitOnRegionServer(regionServerThread); 306 } 307 308 /** 309 * Wait for the specified region server to stop. Removes this thread from list of running threads. 310 * @return Name of region server that just went down. 311 */ 312 public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) { 313 while (rst.isAlive()) { 314 try { 315 LOG.info("Waiting on " + rst.getRegionServer().toString()); 316 rst.join(); 317 } catch (InterruptedException e) { 318 e.printStackTrace(); 319 } 320 } 321 regionThreads.remove(rst); 322 return rst.getName(); 323 } 324 325 /** 326 * @return the HMaster thread 327 */ 328 public HMaster getMaster(int serverNumber) { 329 return masterThreads.get(serverNumber).getMaster(); 330 } 331 332 /** 333 * Gets the current active master, if available. If no active master, returns 334 * null. 335 * @return the HMaster for the active master 336 */ 337 public HMaster getActiveMaster() { 338 for (JVMClusterUtil.MasterThread mt : masterThreads) { 339 // Ensure that the current active master is not stopped. 340 // We don't want to return a stopping master as an active master. 341 if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) { 342 return mt.getMaster(); 343 } 344 } 345 return null; 346 } 347 348 /** 349 * @return Read-only list of master threads. 350 */ 351 public List<JVMClusterUtil.MasterThread> getMasters() { 352 return Collections.unmodifiableList(this.masterThreads); 353 } 354 355 /** 356 * @return List of running master servers (Some servers may have been killed 357 * or aborted during lifetime of cluster; these servers are not included in 358 * this list). 359 */ 360 public List<JVMClusterUtil.MasterThread> getLiveMasters() { 361 List<JVMClusterUtil.MasterThread> liveServers = new ArrayList<>(); 362 List<JVMClusterUtil.MasterThread> list = getMasters(); 363 for (JVMClusterUtil.MasterThread mt: list) { 364 if (mt.isAlive()) { 365 liveServers.add(mt); 366 } 367 } 368 return liveServers; 369 } 370 371 /** 372 * Wait for the specified master to stop. Removes this thread from list of running threads. 373 * @return Name of master that just went down. 374 */ 375 public String waitOnMaster(int serverNumber) { 376 JVMClusterUtil.MasterThread masterThread = this.masterThreads.get(serverNumber); 377 return waitOnMaster(masterThread); 378 } 379 380 /** 381 * Wait for the specified master to stop. Removes this thread from list of running threads. 382 * @return Name of master that just went down. 383 */ 384 public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) { 385 while (masterThread.isAlive()) { 386 try { 387 LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString()); 388 masterThread.join(); 389 } catch (InterruptedException e) { 390 e.printStackTrace(); 391 } 392 } 393 masterThreads.remove(masterThread); 394 return masterThread.getName(); 395 } 396 397 /** 398 * Wait for Mini HBase Cluster to shut down. 399 * Presumes you've already called {@link #shutdown()}. 400 */ 401 public void join() { 402 if (this.regionThreads != null) { 403 for(Thread t: this.regionThreads) { 404 if (t.isAlive()) { 405 try { 406 Threads.threadDumpingIsAlive(t); 407 } catch (InterruptedException e) { 408 LOG.debug("Interrupted", e); 409 } 410 } 411 } 412 } 413 if (this.masterThreads != null) { 414 for (Thread t : this.masterThreads) { 415 if (t.isAlive()) { 416 try { 417 Threads.threadDumpingIsAlive(t); 418 } catch (InterruptedException e) { 419 LOG.debug("Interrupted", e); 420 } 421 } 422 } 423 } 424 } 425 426 /** 427 * Start the cluster. 428 */ 429 public void startup() throws IOException { 430 JVMClusterUtil.startup(this.masterThreads, this.regionThreads); 431 } 432 433 /** 434 * Shut down the mini HBase cluster 435 */ 436 public void shutdown() { 437 JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads); 438 } 439 440 /** 441 * @param c Configuration to check. 442 * @return True if a 'local' address in hbase.master value. 443 */ 444 public static boolean isLocal(final Configuration c) { 445 boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); 446 return(mode == HConstants.CLUSTER_IS_LOCAL); 447 } 448 449 /** 450 * Test things basically work. 451 * @param args 452 * @throws IOException 453 */ 454 public static void main(String[] args) throws IOException { 455 Configuration conf = HBaseConfiguration.create(); 456 LocalHBaseCluster cluster = new LocalHBaseCluster(conf); 457 cluster.startup(); 458 Connection connection = ConnectionFactory.createConnection(conf); 459 Admin admin = connection.getAdmin(); 460 try { 461 HTableDescriptor htd = 462 new HTableDescriptor(TableName.valueOf(cluster.getClass().getName())); 463 admin.createTable(htd); 464 } finally { 465 admin.close(); 466 } 467 connection.close(); 468 cluster.shutdown(); 469 } 470}