001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.InterruptedIOException; 022import java.io.IOException; 023import java.lang.reflect.Constructor; 024import java.lang.reflect.InvocationTargetException; 025import java.util.List; 026import java.util.concurrent.TimeUnit; 027import java.util.function.Supplier; 028 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.master.HMaster; 035import org.apache.hadoop.hbase.regionserver.HRegionServer; 036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 037 038/** 039 * Utility used running a cluster all in the one JVM. 040 */ 041@InterfaceAudience.Private 042public class JVMClusterUtil { 043 private static final Logger LOG = LoggerFactory.getLogger(JVMClusterUtil.class); 044 045 /** 046 * Datastructure to hold RegionServer Thread and RegionServer instance 047 */ 048 public static class RegionServerThread extends Thread { 049 private final HRegionServer regionServer; 050 051 public RegionServerThread(final HRegionServer r, final int index) { 052 super(r, "RS:" + index + ";" + r.getServerName().toShortString()); 053 this.regionServer = r; 054 } 055 056 /** @return the region server */ 057 public HRegionServer getRegionServer() { 058 return this.regionServer; 059 } 060 061 /** 062 * Block until the region server has come online, indicating it is ready 063 * to be used. 064 */ 065 public void waitForServerOnline() { 066 // The server is marked online after the init method completes inside of 067 // the HRS#run method. HRS#init can fail for whatever region. In those 068 // cases, we'll jump out of the run without setting online flag. Check 069 // stopRequested so we don't wait here a flag that will never be flipped. 070 regionServer.waitForServerOnline(); 071 } 072 } 073 074 /** 075 * Creates a {@link RegionServerThread}. 076 * Call 'start' on the returned thread to make it run. 077 * @param c Configuration to use. 078 * @param hrsc Class to create. 079 * @param index Used distinguishing the object returned. 080 * @throws IOException 081 * @return Region server added. 082 */ 083 public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c, 084 final Class<? extends HRegionServer> hrsc, final int index) throws IOException { 085 HRegionServer server; 086 try { 087 Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class); 088 ctor.setAccessible(true); 089 server = ctor.newInstance(c); 090 } catch (InvocationTargetException ite) { 091 Throwable target = ite.getTargetException(); 092 throw new RuntimeException("Failed construction of RegionServer: " + 093 hrsc.toString() + ((target.getCause() != null)? 094 target.getCause().getMessage(): ""), target); 095 } catch (Exception e) { 096 throw new IOException(e); 097 } 098 return new JVMClusterUtil.RegionServerThread(server, index); 099 } 100 101 102 /** 103 * Datastructure to hold Master Thread and Master instance 104 */ 105 public static class MasterThread extends Thread { 106 private final HMaster master; 107 108 public MasterThread(final HMaster m, final int index) { 109 super(m, "M:" + index + ";" + m.getServerName().toShortString()); 110 this.master = m; 111 } 112 113 /** @return the master */ 114 public HMaster getMaster() { 115 return this.master; 116 } 117 } 118 119 /** 120 * Creates a {@link MasterThread}. 121 * Call 'start' on the returned thread to make it run. 122 * @param c Configuration to use. 123 * @param hmc Class to create. 124 * @param index Used distinguishing the object returned. 125 * @throws IOException 126 * @return Master added. 127 */ 128 public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c, 129 final Class<? extends HMaster> hmc, final int index) throws IOException { 130 HMaster server; 131 try { 132 server = hmc.getConstructor(Configuration.class).newInstance(c); 133 } catch (InvocationTargetException ite) { 134 Throwable target = ite.getTargetException(); 135 throw new RuntimeException("Failed construction of Master: " + 136 hmc.toString() + ((target.getCause() != null)? 137 target.getCause().getMessage(): ""), target); 138 } catch (Exception e) { 139 throw new IOException(e); 140 } 141 // Needed if a master based registry is configured for internal cluster connections. Here, we 142 // just add the current master host port since we do not know other master addresses up front 143 // in mini cluster tests. 144 c.set(HConstants.MASTER_ADDRS_KEY, 145 Preconditions.checkNotNull(server.getServerName().getAddress()).toString()); 146 return new JVMClusterUtil.MasterThread(server, index); 147 } 148 149 private static JVMClusterUtil.MasterThread findActiveMaster( 150 List<JVMClusterUtil.MasterThread> masters) { 151 for (JVMClusterUtil.MasterThread t : masters) { 152 if (t.master.isActiveMaster()) { 153 return t; 154 } 155 } 156 157 return null; 158 } 159 160 /** 161 * Start the cluster. Waits until there is a primary master initialized 162 * and returns its address. 163 * @param masters 164 * @param regionservers 165 * @return Address to use contacting primary master. 166 */ 167 public static String startup(final List<JVMClusterUtil.MasterThread> masters, 168 final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException { 169 // Implementation note: This method relies on timed sleeps in a loop. It's not great, and 170 // should probably be re-written to use actual synchronization objects, but it's ok for now 171 172 Configuration configuration = null; 173 174 if (masters == null || masters.isEmpty()) { 175 return null; 176 } 177 178 for (JVMClusterUtil.MasterThread t : masters) { 179 configuration = t.getMaster().getConfiguration(); 180 t.start(); 181 } 182 183 // Wait for an active master 184 // having an active master before starting the region threads allows 185 // then to succeed on their connection to master 186 final int startTimeout = configuration != null ? Integer.parseInt( 187 configuration.get("hbase.master.start.timeout.localHBaseCluster", "30000")) : 30000; 188 waitForEvent(startTimeout, "active", () -> findActiveMaster(masters) != null); 189 190 if (regionservers != null) { 191 for (JVMClusterUtil.RegionServerThread t: regionservers) { 192 t.start(); 193 } 194 } 195 196 // Wait for an active master to be initialized (implies being master) 197 // with this, when we return the cluster is complete 198 final int initTimeout = configuration != null ? Integer.parseInt( 199 configuration.get("hbase.master.init.timeout.localHBaseCluster", "200000")) : 200000; 200 waitForEvent(initTimeout, "initialized", () -> { 201 JVMClusterUtil.MasterThread t = findActiveMaster(masters); 202 // master thread should never be null at this point, but let's keep the check anyway 203 return t != null && t.master.isInitialized(); 204 } 205 ); 206 207 return findActiveMaster(masters).master.getServerName().toString(); 208 } 209 210 /** 211 * Utility method to wait some time for an event to occur, and then return control to the caller. 212 * @param millis How long to wait, in milliseconds. 213 * @param action The action that we are waiting for. Will be used in log message if the event 214 * does not occur. 215 * @param check A Supplier that will be checked periodically to produce an updated true/false 216 * result indicating if the expected event has happened or not. 217 * @throws InterruptedIOException If we are interrupted while waiting for the event. 218 * @throws RuntimeException If we reach the specified timeout while waiting for the event. 219 */ 220 private static void waitForEvent(long millis, String action, Supplier<Boolean> check) 221 throws InterruptedIOException { 222 long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis); 223 224 while (true) { 225 if (check.get()) { 226 return; 227 } 228 229 if (System.nanoTime() > end) { 230 String msg = "Master not " + action + " after " + millis + "ms"; 231 Threads.printThreadInfo(System.out, "Thread dump because: " + msg); 232 throw new RuntimeException(msg); 233 } 234 235 try { 236 Thread.sleep(100); 237 } catch (InterruptedException e) { 238 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 239 } 240 } 241 242 } 243 244 /** 245 * @param masters 246 * @param regionservers 247 */ 248 public static void shutdown(final List<MasterThread> masters, 249 final List<RegionServerThread> regionservers) { 250 LOG.debug("Shutting down HBase Cluster"); 251 if (masters != null) { 252 // Do backups first. 253 JVMClusterUtil.MasterThread activeMaster = null; 254 for (JVMClusterUtil.MasterThread t : masters) { 255 // Master was killed but could be still considered as active. Check first if it is stopped. 256 if (!t.master.isStopped()) { 257 if (!t.master.isActiveMaster()) { 258 try { 259 t.master.stopMaster(); 260 } catch (IOException e) { 261 LOG.error("Exception occurred while stopping master", e); 262 } 263 LOG.info("Stopped backup Master {} is stopped: {}", 264 t.master.hashCode(), t.master.isStopped()); 265 } else { 266 if (activeMaster != null) { 267 LOG.warn("Found more than 1 active master, hash {}", activeMaster.master.hashCode()); 268 } 269 activeMaster = t; 270 LOG.debug("Found active master hash={}, stopped={}", 271 t.master.hashCode(), t.master.isStopped()); 272 } 273 } 274 } 275 // Do active after. 276 if (activeMaster != null) { 277 try { 278 activeMaster.master.shutdown(); 279 } catch (IOException e) { 280 LOG.error("Exception occurred in HMaster.shutdown()", e); 281 } 282 } 283 } 284 boolean wasInterrupted = false; 285 final long maxTime = System.currentTimeMillis() + 30 * 1000; 286 if (regionservers != null) { 287 // first try nicely. 288 for (RegionServerThread t : regionservers) { 289 t.getRegionServer().stop("Shutdown requested"); 290 } 291 for (RegionServerThread t : regionservers) { 292 long now = System.currentTimeMillis(); 293 if (t.isAlive() && !wasInterrupted && now < maxTime) { 294 try { 295 t.join(maxTime - now); 296 } catch (InterruptedException e) { 297 LOG.info("Got InterruptedException on shutdown - " + 298 "not waiting anymore on region server ends", e); 299 wasInterrupted = true; // someone wants us to speed up. 300 } 301 } 302 } 303 304 // Let's try to interrupt the remaining threads if any. 305 for (int i = 0; i < 100; ++i) { 306 boolean atLeastOneLiveServer = false; 307 for (RegionServerThread t : regionservers) { 308 if (t.isAlive()) { 309 atLeastOneLiveServer = true; 310 try { 311 LOG.warn("RegionServerThreads remaining, give one more chance before interrupting"); 312 t.join(1000); 313 } catch (InterruptedException e) { 314 wasInterrupted = true; 315 } 316 } 317 } 318 if (!atLeastOneLiveServer) break; 319 for (RegionServerThread t : regionservers) { 320 if (t.isAlive()) { 321 LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump " + 322 "if > 3 attempts: i=" + i); 323 if (i > 3) { 324 Threads.printThreadInfo(System.out, "Thread dump " + t.getName()); 325 } 326 t.interrupt(); 327 } 328 } 329 } 330 } 331 332 if (masters != null) { 333 for (JVMClusterUtil.MasterThread t : masters) { 334 while (t.master.isAlive() && !wasInterrupted) { 335 try { 336 // The below has been replaced to debug sometime hangs on end of 337 // tests. 338 // this.master.join(): 339 Threads.threadDumpingIsAlive(t.master); 340 } catch(InterruptedException e) { 341 LOG.info("Got InterruptedException on shutdown - " + 342 "not waiting anymore on master ends", e); 343 wasInterrupted = true; 344 } 345 } 346 } 347 } 348 LOG.info("Shutdown of " + 349 ((masters != null) ? masters.size() : "0") + " master(s) and " + 350 ((regionservers != null) ? regionservers.size() : "0") + 351 " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete")); 352 353 if (wasInterrupted){ 354 Thread.currentThread().interrupt(); 355 } 356 } 357}