001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedExceptionAction; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.concurrent.CopyOnWriteArrayList; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.client.Admin; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.master.HMaster; 031import org.apache.hadoop.hbase.regionserver.HRegionServer; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hadoop.hbase.util.JVMClusterUtil; 034import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * This class creates a single process HBase cluster. One thread is created for a master and one per 042 * region server. Call {@link #startup()} to start the cluster running and {@link #shutdown()} to 043 * close it all down. {@link #join} the cluster is you want to wait on shutdown completion. 044 * <p> 045 * Runs master on port 16000 by default. Because we can't just kill the process -- not till 046 * HADOOP-1700 gets fixed and even then.... -- we need to be able to find the master with a remote 047 * client to run shutdown. To use a port other than 16000, set the hbase.master to a value of 048 * 'local:PORT': that is 'local', not 'localhost', and the port number the master should use instead 049 * of 16000. 050 */ 051@InterfaceAudience.Public 052public class LocalHBaseCluster { 053 private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class); 054 private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>(); 055 private final List<JVMClusterUtil.RegionServerThread> regionThreads = 056 new CopyOnWriteArrayList<>(); 057 private final static int DEFAULT_NO = 1; 058 /** local mode */ 059 public static final String LOCAL = "local"; 060 /** 'local:' */ 061 public static final String LOCAL_COLON = LOCAL + ":"; 062 public static final String ASSIGN_RANDOM_PORTS = "hbase.localcluster.assign.random.ports"; 063 064 private final Configuration conf; 065 private final Class<? extends HMaster> masterClass; 066 private final Class<? extends HRegionServer> regionServerClass; 067 068 /** 069 * Constructor. nn 070 */ 071 public LocalHBaseCluster(final Configuration conf) throws IOException { 072 this(conf, DEFAULT_NO); 073 } 074 075 /** 076 * Constructor. 077 * @param conf Configuration to use. Post construction has the master's address. 078 * @param noRegionServers Count of regionservers to start. n 079 */ 080 public LocalHBaseCluster(final Configuration conf, final int noRegionServers) throws IOException { 081 this(conf, 1, 0, noRegionServers, getMasterImplementation(conf), 082 getRegionServerImplementation(conf)); 083 } 084 085 /** 086 * Constructor. 087 * @param conf Configuration to use. Post construction has the active master address. 088 * @param noMasters Count of masters to start. 089 * @param noRegionServers Count of regionservers to start. n 090 */ 091 public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers) 092 throws IOException { 093 this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf), 094 getRegionServerImplementation(conf)); 095 } 096 097 @SuppressWarnings("unchecked") 098 private static Class<? extends HRegionServer> 099 getRegionServerImplementation(final Configuration conf) { 100 return (Class<? extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL, 101 HRegionServer.class); 102 } 103 104 @SuppressWarnings("unchecked") 105 private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) { 106 return (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, HMaster.class); 107 } 108 109 public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers, 110 final Class<? extends HMaster> masterClass, 111 final Class<? extends HRegionServer> regionServerClass) throws IOException { 112 this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass); 113 } 114 115 /** 116 * Constructor. 117 * @param conf Configuration to use. Post construction has the master's address. 118 * @param noMasters Count of masters to start. 119 * @param noRegionServers Count of regionservers to start. nnn 120 */ 121 @SuppressWarnings("unchecked") 122 public LocalHBaseCluster(final Configuration conf, final int noMasters, 123 final int noAlwaysStandByMasters, final int noRegionServers, 124 final Class<? extends HMaster> masterClass, 125 final Class<? extends HRegionServer> regionServerClass) throws IOException { 126 this.conf = conf; 127 128 // When active, if a port selection is default then we switch to random 129 if (conf.getBoolean(ASSIGN_RANDOM_PORTS, false)) { 130 if ( 131 conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT) 132 == HConstants.DEFAULT_MASTER_PORT 133 ) { 134 LOG.debug("Setting Master Port to random."); 135 conf.set(HConstants.MASTER_PORT, "0"); 136 } 137 if ( 138 conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT) 139 == HConstants.DEFAULT_REGIONSERVER_PORT 140 ) { 141 LOG.debug("Setting RegionServer Port to random."); 142 conf.set(HConstants.REGIONSERVER_PORT, "0"); 143 } 144 // treat info ports special; expressly don't change '-1' (keep off) 145 // in case we make that the default behavior. 146 if ( 147 conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 0) != -1 148 && conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 149 HConstants.DEFAULT_REGIONSERVER_INFOPORT) == HConstants.DEFAULT_REGIONSERVER_INFOPORT 150 ) { 151 LOG.debug("Setting RS InfoServer Port to random."); 152 conf.set(HConstants.REGIONSERVER_INFO_PORT, "0"); 153 } 154 if ( 155 conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1 156 && conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT) 157 == HConstants.DEFAULT_MASTER_INFOPORT 158 ) { 159 LOG.debug("Setting Master InfoServer Port to random."); 160 conf.set(HConstants.MASTER_INFO_PORT, "0"); 161 } 162 } 163 164 this.masterClass = 165 (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, masterClass); 166 // Start the HMasters. 167 int i; 168 for (i = 0; i < noMasters; i++) { 169 addMaster(new Configuration(conf), i); 170 } 171 for (int j = 0; j < noAlwaysStandByMasters; j++) { 172 Configuration c = new Configuration(conf); 173 c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster"); 174 addMaster(c, i + j); 175 } 176 // Start the HRegionServers. 177 this.regionServerClass = (Class<? extends HRegionServer>) conf 178 .getClass(HConstants.REGION_SERVER_IMPL, regionServerClass); 179 180 for (int j = 0; j < noRegionServers; j++) { 181 addRegionServer(new Configuration(conf), j); 182 } 183 } 184 185 public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException { 186 return addRegionServer(new Configuration(conf), this.regionThreads.size()); 187 } 188 189 @SuppressWarnings("unchecked") 190 public JVMClusterUtil.RegionServerThread addRegionServer(Configuration config, final int index) 191 throws IOException { 192 // Create each regionserver with its own Configuration instance so each has 193 // its Connection instance rather than share (see HBASE_INSTANCES down in 194 // the guts of ConnectionManager). 195 JVMClusterUtil.RegionServerThread rst = 196 JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf 197 .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index); 198 199 this.regionThreads.add(rst); 200 return rst; 201 } 202 203 public JVMClusterUtil.RegionServerThread addRegionServer(final Configuration config, 204 final int index, User user) throws IOException, InterruptedException { 205 return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() { 206 @Override 207 public JVMClusterUtil.RegionServerThread run() throws Exception { 208 return addRegionServer(config, index); 209 } 210 }); 211 } 212 213 public JVMClusterUtil.MasterThread addMaster() throws IOException { 214 return addMaster(new Configuration(conf), this.masterThreads.size()); 215 } 216 217 public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index) 218 throws IOException { 219 // Create each master with its own Configuration instance so each has 220 // its Connection instance rather than share (see HBASE_INSTANCES down in 221 // the guts of ConnectionManager. 222 JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, 223 (Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index); 224 this.masterThreads.add(mt); 225 // Refresh the master address config. 226 List<String> masterHostPorts = new ArrayList<>(); 227 getMasters().forEach(masterThread -> masterHostPorts 228 .add(masterThread.getMaster().getServerName().getAddress().toString())); 229 conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts)); 230 return mt; 231 } 232 233 public JVMClusterUtil.MasterThread addMaster(final Configuration c, final int index, User user) 234 throws IOException, InterruptedException { 235 return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() { 236 @Override 237 public JVMClusterUtil.MasterThread run() throws Exception { 238 return addMaster(c, index); 239 } 240 }); 241 } 242 243 /** 244 * n * @return region server 245 */ 246 public HRegionServer getRegionServer(int serverNumber) { 247 return regionThreads.get(serverNumber).getRegionServer(); 248 } 249 250 /** Returns Read-only list of region server threads. */ 251 public List<JVMClusterUtil.RegionServerThread> getRegionServers() { 252 return Collections.unmodifiableList(this.regionThreads); 253 } 254 255 /** 256 * @return List of running servers (Some servers may have been killed or aborted during lifetime 257 * of cluster; these servers are not included in this list). 258 */ 259 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() { 260 List<JVMClusterUtil.RegionServerThread> liveServers = new ArrayList<>(); 261 List<RegionServerThread> list = getRegionServers(); 262 for (JVMClusterUtil.RegionServerThread rst : list) { 263 if (rst.isAlive()) liveServers.add(rst); 264 else LOG.info("Not alive " + rst.getName()); 265 } 266 return liveServers; 267 } 268 269 /** Returns the Configuration used by this LocalHBaseCluster */ 270 public Configuration getConfiguration() { 271 return this.conf; 272 } 273 274 /** 275 * Wait for the specified region server to stop. Removes this thread from list of running threads. 276 * @return Name of region server that just went down. 277 */ 278 public String waitOnRegionServer(int serverNumber) { 279 JVMClusterUtil.RegionServerThread regionServerThread = this.regionThreads.get(serverNumber); 280 return waitOnRegionServer(regionServerThread); 281 } 282 283 /** 284 * Wait for the specified region server to stop. Removes this thread from list of running threads. 285 * @return Name of region server that just went down. 286 */ 287 public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) { 288 boolean interrupted = false; 289 while (rst.isAlive()) { 290 try { 291 LOG.info("Waiting on " + rst.getRegionServer().toString()); 292 rst.join(); 293 } catch (InterruptedException e) { 294 LOG.error("Interrupted while waiting for {} to finish. Retrying join", rst.getName(), e); 295 interrupted = true; 296 } 297 } 298 regionThreads.remove(rst); 299 if (interrupted) { 300 Thread.currentThread().interrupt(); 301 } 302 return rst.getName(); 303 } 304 305 /** Returns the HMaster thread */ 306 public HMaster getMaster(int serverNumber) { 307 return masterThreads.get(serverNumber).getMaster(); 308 } 309 310 /** 311 * Gets the current active master, if available. If no active master, returns null. 312 * @return the HMaster for the active master 313 */ 314 public HMaster getActiveMaster() { 315 for (JVMClusterUtil.MasterThread mt : masterThreads) { 316 // Ensure that the current active master is not stopped. 317 // We don't want to return a stopping master as an active master. 318 if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) { 319 return mt.getMaster(); 320 } 321 } 322 return null; 323 } 324 325 /** Returns Read-only list of master threads. */ 326 public List<JVMClusterUtil.MasterThread> getMasters() { 327 return Collections.unmodifiableList(this.masterThreads); 328 } 329 330 /** 331 * @return List of running master servers (Some servers may have been killed or aborted during 332 * lifetime of cluster; these servers are not included in this list). 333 */ 334 public List<JVMClusterUtil.MasterThread> getLiveMasters() { 335 List<JVMClusterUtil.MasterThread> liveServers = new ArrayList<>(); 336 List<JVMClusterUtil.MasterThread> list = getMasters(); 337 for (JVMClusterUtil.MasterThread mt : list) { 338 if (mt.isAlive()) { 339 liveServers.add(mt); 340 } 341 } 342 return liveServers; 343 } 344 345 /** 346 * Wait for the specified master to stop. Removes this thread from list of running threads. 347 * @return Name of master that just went down. 348 */ 349 public String waitOnMaster(int serverNumber) { 350 JVMClusterUtil.MasterThread masterThread = this.masterThreads.get(serverNumber); 351 return waitOnMaster(masterThread); 352 } 353 354 /** 355 * Wait for the specified master to stop. Removes this thread from list of running threads. 356 * @return Name of master that just went down. 357 */ 358 public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) { 359 boolean interrupted = false; 360 while (masterThread.isAlive()) { 361 try { 362 LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString()); 363 masterThread.join(); 364 } catch (InterruptedException e) { 365 LOG.error("Interrupted while waiting for {} to finish. Retrying join", 366 masterThread.getName(), e); 367 interrupted = true; 368 } 369 } 370 masterThreads.remove(masterThread); 371 if (interrupted) { 372 Thread.currentThread().interrupt(); 373 } 374 return masterThread.getName(); 375 } 376 377 /** 378 * Wait for Mini HBase Cluster to shut down. Presumes you've already called {@link #shutdown()}. 379 */ 380 public void join() { 381 if (this.regionThreads != null) { 382 for (Thread t : this.regionThreads) { 383 if (t.isAlive()) { 384 try { 385 Threads.threadDumpingIsAlive(t); 386 } catch (InterruptedException e) { 387 LOG.debug("Interrupted", e); 388 } 389 } 390 } 391 } 392 if (this.masterThreads != null) { 393 for (Thread t : this.masterThreads) { 394 if (t.isAlive()) { 395 try { 396 Threads.threadDumpingIsAlive(t); 397 } catch (InterruptedException e) { 398 LOG.debug("Interrupted", e); 399 } 400 } 401 } 402 } 403 } 404 405 /** 406 * Start the cluster. 407 */ 408 public void startup() throws IOException { 409 JVMClusterUtil.startup(this.masterThreads, this.regionThreads); 410 } 411 412 /** 413 * Shut down the mini HBase cluster 414 */ 415 public void shutdown() { 416 JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads); 417 } 418 419 /** 420 * @param c Configuration to check. 421 * @return True if a 'local' address in hbase.master value. 422 */ 423 public static boolean isLocal(final Configuration c) { 424 boolean mode = 425 c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); 426 return (mode == HConstants.CLUSTER_IS_LOCAL); 427 } 428 429 /** 430 * Test things basically work. nn 431 */ 432 public static void main(String[] args) throws IOException { 433 Configuration conf = HBaseConfiguration.create(); 434 LocalHBaseCluster cluster = new LocalHBaseCluster(conf); 435 cluster.startup(); 436 Connection connection = ConnectionFactory.createConnection(conf); 437 Admin admin = connection.getAdmin(); 438 try { 439 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cluster.getClass().getName())); 440 admin.createTable(htd); 441 } finally { 442 admin.close(); 443 } 444 connection.close(); 445 cluster.shutdown(); 446 } 447}