001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedExceptionAction; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.concurrent.CopyOnWriteArrayList; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.client.Admin; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.client.TableDescriptor; 031import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 032import org.apache.hadoop.hbase.master.HMaster; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.security.User; 035import org.apache.hadoop.hbase.util.JVMClusterUtil; 036import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * This class creates a single process HBase cluster. One thread is created for a master and one per 044 * region server. Call {@link #startup()} to start the cluster running and {@link #shutdown()} to 045 * close it all down. {@link #join} the cluster is you want to wait on shutdown completion. 046 * <p> 047 * Runs master on port 16000 by default. Because we can't just kill the process -- not till 048 * HADOOP-1700 gets fixed and even then.... -- we need to be able to find the master with a remote 049 * client to run shutdown. To use a port other than 16000, set the hbase.master to a value of 050 * 'local:PORT': that is 'local', not 'localhost', and the port number the master should use instead 051 * of 16000. 052 */ 053@InterfaceAudience.Private 054public class LocalHBaseCluster { 055 private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class); 056 private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>(); 057 private final List<JVMClusterUtil.RegionServerThread> regionThreads = 058 new CopyOnWriteArrayList<>(); 059 private final static int DEFAULT_NO = 1; 060 /** local mode */ 061 public static final String LOCAL = "local"; 062 /** 'local:' */ 063 public static final String LOCAL_COLON = LOCAL + ":"; 064 public static final String ASSIGN_RANDOM_PORTS = "hbase.localcluster.assign.random.ports"; 065 066 private final Configuration conf; 067 private final Class<? extends HMaster> masterClass; 068 private final Class<? extends HRegionServer> regionServerClass; 069 070 /** 071 * Constructor. 072 */ 073 public LocalHBaseCluster(final Configuration conf) throws IOException { 074 this(conf, DEFAULT_NO); 075 } 076 077 /** 078 * Constructor. 079 * @param conf Configuration to use. Post construction has the master's address. 080 * @param noRegionServers Count of regionservers to start. 081 */ 082 public LocalHBaseCluster(final Configuration conf, final int noRegionServers) throws IOException { 083 this(conf, 1, 0, noRegionServers, getMasterImplementation(conf), 084 getRegionServerImplementation(conf)); 085 } 086 087 /** 088 * Constructor. 089 * @param conf Configuration to use. Post construction has the active master address. 090 * @param noMasters Count of masters to start. 091 * @param noRegionServers Count of regionservers to start. 092 */ 093 public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers) 094 throws IOException { 095 this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf), 096 getRegionServerImplementation(conf)); 097 } 098 099 @SuppressWarnings("unchecked") 100 private static Class<? extends HRegionServer> 101 getRegionServerImplementation(final Configuration conf) { 102 return (Class<? extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL, 103 HRegionServer.class); 104 } 105 106 @SuppressWarnings("unchecked") 107 private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) { 108 return (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, HMaster.class); 109 } 110 111 public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers, 112 final Class<? extends HMaster> masterClass, 113 final Class<? extends HRegionServer> regionServerClass) throws IOException { 114 this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass); 115 } 116 117 /** 118 * Constructor. 119 * @param conf Configuration to use. Post construction has the master's address. 120 * @param noMasters Count of masters to start. 121 * @param noRegionServers Count of regionservers to start. 122 */ 123 @SuppressWarnings("unchecked") 124 public LocalHBaseCluster(final Configuration conf, final int noMasters, 125 final int noAlwaysStandByMasters, final int noRegionServers, 126 final Class<? extends HMaster> masterClass, 127 final Class<? extends HRegionServer> regionServerClass) throws IOException { 128 this.conf = conf; 129 130 // When active, if a port selection is default then we switch to random 131 if (conf.getBoolean(ASSIGN_RANDOM_PORTS, false)) { 132 if ( 133 conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT) 134 == HConstants.DEFAULT_MASTER_PORT 135 ) { 136 LOG.debug("Setting Master Port to random."); 137 conf.set(HConstants.MASTER_PORT, "0"); 138 } 139 if ( 140 conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT) 141 == HConstants.DEFAULT_REGIONSERVER_PORT 142 ) { 143 LOG.debug("Setting RegionServer Port to random."); 144 conf.set(HConstants.REGIONSERVER_PORT, "0"); 145 } 146 // treat info ports special; expressly don't change '-1' (keep off) 147 // in case we make that the default behavior. 148 if ( 149 conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 0) != -1 150 && conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 151 HConstants.DEFAULT_REGIONSERVER_INFOPORT) == HConstants.DEFAULT_REGIONSERVER_INFOPORT 152 ) { 153 LOG.debug("Setting RS InfoServer Port to random."); 154 conf.set(HConstants.REGIONSERVER_INFO_PORT, "0"); 155 } 156 if ( 157 conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1 158 && conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT) 159 == HConstants.DEFAULT_MASTER_INFOPORT 160 ) { 161 LOG.debug("Setting Master InfoServer Port to random."); 162 conf.set(HConstants.MASTER_INFO_PORT, "0"); 163 } 164 } 165 166 this.masterClass = 167 (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, masterClass); 168 // Start the HMasters. 169 int i; 170 for (i = 0; i < noMasters; i++) { 171 addMaster(new Configuration(conf), i); 172 } 173 for (int j = 0; j < noAlwaysStandByMasters; j++) { 174 Configuration c = new Configuration(conf); 175 c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster"); 176 addMaster(c, i + j); 177 } 178 // Start the HRegionServers. 179 this.regionServerClass = (Class<? extends HRegionServer>) conf 180 .getClass(HConstants.REGION_SERVER_IMPL, regionServerClass); 181 182 for (int j = 0; j < noRegionServers; j++) { 183 addRegionServer(new Configuration(conf), j); 184 } 185 } 186 187 public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException { 188 return addRegionServer(new Configuration(conf), this.regionThreads.size()); 189 } 190 191 @SuppressWarnings("unchecked") 192 public JVMClusterUtil.RegionServerThread addRegionServer(Configuration config, final int index) 193 throws IOException { 194 // Create each regionserver with its own Configuration instance so each has 195 // its Connection instance rather than share (see HBASE_INSTANCES down in 196 // the guts of ConnectionManager). 197 JVMClusterUtil.RegionServerThread rst = 198 JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf 199 .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index); 200 201 this.regionThreads.add(rst); 202 return rst; 203 } 204 205 public JVMClusterUtil.RegionServerThread addRegionServer(final Configuration config, 206 final int index, User user) throws IOException, InterruptedException { 207 return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() { 208 @Override 209 public JVMClusterUtil.RegionServerThread run() throws Exception { 210 return addRegionServer(config, index); 211 } 212 }); 213 } 214 215 public JVMClusterUtil.MasterThread addMaster() throws IOException { 216 return addMaster(new Configuration(conf), this.masterThreads.size()); 217 } 218 219 public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index) 220 throws IOException { 221 // Create each master with its own Configuration instance so each has 222 // its Connection instance rather than share (see HBASE_INSTANCES down in 223 // the guts of ConnectionManager. 224 JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, 225 (Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index); 226 this.masterThreads.add(mt); 227 // Refresh the master address config. 228 List<String> masterHostPorts = new ArrayList<>(); 229 getMasters().forEach(masterThread -> masterHostPorts 230 .add(masterThread.getMaster().getServerName().getAddress().toString())); 231 conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts)); 232 return mt; 233 } 234 235 public JVMClusterUtil.MasterThread addMaster(final Configuration c, final int index, User user) 236 throws IOException, InterruptedException { 237 return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() { 238 @Override 239 public JVMClusterUtil.MasterThread run() throws Exception { 240 return addMaster(c, index); 241 } 242 }); 243 } 244 245 /** Returns region server */ 246 public HRegionServer getRegionServer(int serverNumber) { 247 return regionThreads.get(serverNumber).getRegionServer(); 248 } 249 250 /** Returns Read-only list of region server threads. */ 251 public List<JVMClusterUtil.RegionServerThread> getRegionServers() { 252 return Collections.unmodifiableList(this.regionThreads); 253 } 254 255 /** 256 * @return List of running servers (Some servers may have been killed or aborted during lifetime 257 * of cluster; these servers are not included in this list). 258 */ 259 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() { 260 List<JVMClusterUtil.RegionServerThread> liveServers = new ArrayList<>(); 261 List<RegionServerThread> list = getRegionServers(); 262 for (JVMClusterUtil.RegionServerThread rst : list) { 263 if (rst.isAlive()) liveServers.add(rst); 264 else LOG.info("Not alive " + rst.getName()); 265 } 266 return liveServers; 267 } 268 269 /** Returns the Configuration used by this LocalHBaseCluster */ 270 public Configuration getConfiguration() { 271 return this.conf; 272 } 273 274 /** 275 * Wait for the specified region server to stop. Removes this thread from list of running threads. 276 * @return Name of region server that just went down. 277 */ 278 public String waitOnRegionServer(int serverNumber) { 279 JVMClusterUtil.RegionServerThread regionServerThread = this.regionThreads.get(serverNumber); 280 return waitOnRegionServer(regionServerThread); 281 } 282 283 /** 284 * Wait for the specified region server to stop. Removes this thread from list of running threads. 285 * @return Name of region server that just went down. 286 */ 287 public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) { 288 boolean interrupted = false; 289 while (rst.isAlive()) { 290 try { 291 LOG.info("Waiting on " + rst.getRegionServer().toString()); 292 rst.join(); 293 } catch (InterruptedException e) { 294 LOG.error("Interrupted while waiting for {} to finish. Retrying join", rst.getName(), e); 295 interrupted = true; 296 } 297 } 298 regionThreads.remove(rst); 299 if (interrupted) { 300 Thread.currentThread().interrupt(); 301 } 302 return rst.getName(); 303 } 304 305 /** Returns the HMaster thread */ 306 public HMaster getMaster(int serverNumber) { 307 return masterThreads.get(serverNumber).getMaster(); 308 } 309 310 /** 311 * Gets the current active master, if available. If no active master, returns null. 312 * @return the HMaster for the active master 313 */ 314 public HMaster getActiveMaster() { 315 for (JVMClusterUtil.MasterThread mt : masterThreads) { 316 // Ensure that the current active master is not stopped. 317 // We don't want to return a stopping master as an active master. 318 if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) { 319 return mt.getMaster(); 320 } 321 } 322 return null; 323 } 324 325 /** Returns Read-only list of master threads. */ 326 public List<JVMClusterUtil.MasterThread> getMasters() { 327 return Collections.unmodifiableList(this.masterThreads); 328 } 329 330 /** 331 * @return List of running master servers (Some servers may have been killed or aborted during 332 * lifetime of cluster; these servers are not included in this list). 333 */ 334 public List<JVMClusterUtil.MasterThread> getLiveMasters() { 335 List<JVMClusterUtil.MasterThread> liveServers = new ArrayList<>(); 336 List<JVMClusterUtil.MasterThread> list = getMasters(); 337 for (JVMClusterUtil.MasterThread mt : list) { 338 if (mt.isAlive()) { 339 liveServers.add(mt); 340 } 341 } 342 return liveServers; 343 } 344 345 /** 346 * Wait for the specified master to stop. Removes this thread from list of running threads. 347 * @return Name of master that just went down. 348 */ 349 public String waitOnMaster(int serverNumber) { 350 JVMClusterUtil.MasterThread masterThread = this.masterThreads.get(serverNumber); 351 return waitOnMaster(masterThread); 352 } 353 354 /** 355 * Wait for the specified master to stop. Removes this thread from list of running threads. 356 * @return Name of master that just went down. 357 */ 358 public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) { 359 boolean interrupted = false; 360 while (masterThread.isAlive()) { 361 try { 362 LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString()); 363 masterThread.join(); 364 } catch (InterruptedException e) { 365 LOG.error("Interrupted while waiting for {} to finish. Retrying join", 366 masterThread.getName(), e); 367 interrupted = true; 368 } 369 } 370 masterThreads.remove(masterThread); 371 if (interrupted) { 372 Thread.currentThread().interrupt(); 373 } 374 return masterThread.getName(); 375 } 376 377 /** 378 * Wait for Mini HBase Cluster to shut down. Presumes you've already called {@link #shutdown()}. 379 */ 380 public void join() { 381 if (this.regionThreads != null) { 382 for (Thread t : this.regionThreads) { 383 if (t.isAlive()) { 384 try { 385 Threads.threadDumpingIsAlive(t); 386 } catch (InterruptedException e) { 387 LOG.debug("Interrupted", e); 388 } 389 } 390 } 391 } 392 if (this.masterThreads != null) { 393 for (Thread t : this.masterThreads) { 394 if (t.isAlive()) { 395 try { 396 Threads.threadDumpingIsAlive(t); 397 } catch (InterruptedException e) { 398 LOG.debug("Interrupted", e); 399 } 400 } 401 } 402 } 403 } 404 405 /** 406 * Start the cluster. 407 */ 408 public void startup() throws IOException { 409 JVMClusterUtil.startup(this.masterThreads, this.regionThreads); 410 } 411 412 /** 413 * Shut down the mini HBase cluster 414 */ 415 public void shutdown() { 416 JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads); 417 } 418 419 /** 420 * @param c Configuration to check. 421 * @return True if a 'local' address in hbase.master value. 422 */ 423 public static boolean isLocal(final Configuration c) { 424 boolean mode = 425 c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); 426 return (mode == HConstants.CLUSTER_IS_LOCAL); 427 } 428 429 /** 430 * Test things basically work. 431 */ 432 public static void main(String[] args) throws IOException { 433 Configuration conf = HBaseConfiguration.create(); 434 LocalHBaseCluster cluster = new LocalHBaseCluster(conf); 435 cluster.startup(); 436 try (Connection connection = ConnectionFactory.createConnection(conf); 437 Admin admin = connection.getAdmin()) { 438 TableDescriptor htd = 439 TableDescriptorBuilder.newBuilder(TableName.valueOf(cluster.getClass().getName())).build(); 440 admin.createTable(htd); 441 } finally { 442 cluster.shutdown(); 443 } 444 } 445}