001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.lang.reflect.Constructor; 023import java.lang.reflect.InvocationTargetException; 024import java.util.List; 025import java.util.concurrent.TimeUnit; 026import java.util.function.Supplier; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.master.HMaster; 030import org.apache.hadoop.hbase.regionserver.HRegionServer; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 036 037/** 038 * Utility used running a cluster all in the one JVM. 039 */ 040@InterfaceAudience.Private 041public class JVMClusterUtil { 042 private static final Logger LOG = LoggerFactory.getLogger(JVMClusterUtil.class); 043 044 /** 045 * Datastructure to hold RegionServer Thread and RegionServer instance 046 */ 047 public static class RegionServerThread extends Thread { 048 private final HRegionServer regionServer; 049 050 public RegionServerThread(final HRegionServer r, final int index) { 051 super(r, "RS:" + index + ";" + r.getServerName().toShortString()); 052 this.regionServer = r; 053 } 054 055 /** Returns the region server */ 056 public HRegionServer getRegionServer() { 057 return this.regionServer; 058 } 059 060 /** 061 * Block until the region server has come online, indicating it is ready to be used. 062 */ 063 public void waitForServerOnline() { 064 // The server is marked online after the init method completes inside of 065 // the HRS#run method. HRS#init can fail for whatever region. In those 066 // cases, we'll jump out of the run without setting online flag. Check 067 // stopRequested so we don't wait here a flag that will never be flipped. 068 regionServer.waitForServerOnline(); 069 } 070 } 071 072 /** 073 * Creates a {@link RegionServerThread}. Call 'start' on the returned thread to make it run. 074 * @param c Configuration to use. 075 * @param hrsc Class to create. 076 * @param index Used distinguishing the object returned. n * @return Region server added. 077 */ 078 public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c, 079 final Class<? extends HRegionServer> hrsc, final int index) throws IOException { 080 HRegionServer server; 081 try { 082 Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class); 083 ctor.setAccessible(true); 084 server = ctor.newInstance(c); 085 } catch (InvocationTargetException ite) { 086 Throwable target = ite.getTargetException(); 087 throw new RuntimeException("Failed construction of RegionServer: " + hrsc.toString() 088 + ((target.getCause() != null) ? target.getCause().getMessage() : ""), target); 089 } catch (Exception e) { 090 throw new IOException(e); 091 } 092 return new JVMClusterUtil.RegionServerThread(server, index); 093 } 094 095 /** 096 * Datastructure to hold Master Thread and Master instance 097 */ 098 public static class MasterThread extends Thread { 099 private final HMaster master; 100 101 public MasterThread(final HMaster m, final int index) { 102 super(m, "M:" + index + ";" + m.getServerName().toShortString()); 103 this.master = m; 104 } 105 106 /** Returns the master */ 107 public HMaster getMaster() { 108 return this.master; 109 } 110 } 111 112 /** 113 * Creates a {@link MasterThread}. Call 'start' on the returned thread to make it run. 114 * @param c Configuration to use. 115 * @param hmc Class to create. 116 * @param index Used distinguishing the object returned. n * @return Master added. 117 */ 118 public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c, 119 final Class<? extends HMaster> hmc, final int index) throws IOException { 120 HMaster server; 121 try { 122 server = hmc.getConstructor(Configuration.class).newInstance(c); 123 } catch (InvocationTargetException ite) { 124 Throwable target = ite.getTargetException(); 125 throw new RuntimeException("Failed construction of Master: " + hmc.toString() 126 + ((target.getCause() != null) ? target.getCause().getMessage() : ""), target); 127 } catch (Exception e) { 128 throw new IOException(e); 129 } 130 // Needed if a master based registry is configured for internal cluster connections. Here, we 131 // just add the current master host port since we do not know other master addresses up front 132 // in mini cluster tests. 133 c.set(HConstants.MASTER_ADDRS_KEY, 134 Preconditions.checkNotNull(server.getServerName().getAddress()).toString()); 135 return new JVMClusterUtil.MasterThread(server, index); 136 } 137 138 private static JVMClusterUtil.MasterThread 139 findActiveMaster(List<JVMClusterUtil.MasterThread> masters) { 140 for (JVMClusterUtil.MasterThread t : masters) { 141 if (t.master.isActiveMaster()) { 142 return t; 143 } 144 } 145 146 return null; 147 } 148 149 /** 150 * Start the cluster. Waits until there is a primary master initialized and returns its address. 151 * nn * @return Address to use contacting primary master. 152 */ 153 public static String startup(final List<JVMClusterUtil.MasterThread> masters, 154 final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException { 155 // Implementation note: This method relies on timed sleeps in a loop. It's not great, and 156 // should probably be re-written to use actual synchronization objects, but it's ok for now 157 158 Configuration configuration = null; 159 160 if (masters == null || masters.isEmpty()) { 161 return null; 162 } 163 164 for (JVMClusterUtil.MasterThread t : masters) { 165 configuration = t.getMaster().getConfiguration(); 166 t.start(); 167 } 168 169 // Wait for an active master 170 // having an active master before starting the region threads allows 171 // then to succeed on their connection to master 172 final int startTimeout = configuration != null 173 ? Integer.parseInt(configuration.get("hbase.master.start.timeout.localHBaseCluster", "30000")) 174 : 30000; 175 waitForEvent(startTimeout, "active", () -> findActiveMaster(masters) != null); 176 177 if (regionservers != null) { 178 for (JVMClusterUtil.RegionServerThread t : regionservers) { 179 t.start(); 180 } 181 } 182 183 // Wait for an active master to be initialized (implies being master) 184 // with this, when we return the cluster is complete 185 final int initTimeout = configuration != null 186 ? Integer.parseInt(configuration.get("hbase.master.init.timeout.localHBaseCluster", "200000")) 187 : 200000; 188 waitForEvent(initTimeout, "initialized", () -> { 189 JVMClusterUtil.MasterThread t = findActiveMaster(masters); 190 // master thread should never be null at this point, but let's keep the check anyway 191 return t != null && t.master.isInitialized(); 192 }); 193 194 return findActiveMaster(masters).master.getServerName().toString(); 195 } 196 197 /** 198 * Utility method to wait some time for an event to occur, and then return control to the caller. 199 * @param millis How long to wait, in milliseconds. 200 * @param action The action that we are waiting for. Will be used in log message if the event does 201 * not occur. 202 * @param check A Supplier that will be checked periodically to produce an updated true/false 203 * result indicating if the expected event has happened or not. 204 * @throws InterruptedIOException If we are interrupted while waiting for the event. 205 * @throws RuntimeException If we reach the specified timeout while waiting for the event. 206 */ 207 private static void waitForEvent(long millis, String action, Supplier<Boolean> check) 208 throws InterruptedIOException { 209 long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis); 210 211 while (true) { 212 if (check.get()) { 213 return; 214 } 215 216 if (System.nanoTime() > end) { 217 String msg = "Master not " + action + " after " + millis + "ms"; 218 Threads.printThreadInfo(System.out, "Thread dump because: " + msg); 219 throw new RuntimeException(msg); 220 } 221 222 try { 223 Thread.sleep(100); 224 } catch (InterruptedException e) { 225 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 226 } 227 } 228 229 } 230 231 /** 232 * nn 233 */ 234 public static void shutdown(final List<MasterThread> masters, 235 final List<RegionServerThread> regionservers) { 236 LOG.debug("Shutting down HBase Cluster"); 237 if (masters != null) { 238 // Do backups first. 239 JVMClusterUtil.MasterThread activeMaster = null; 240 for (JVMClusterUtil.MasterThread t : masters) { 241 // Master was killed but could be still considered as active. Check first if it is stopped. 242 if (!t.master.isStopped()) { 243 if (!t.master.isActiveMaster()) { 244 try { 245 t.master.stopMaster(); 246 } catch (IOException e) { 247 LOG.error("Exception occurred while stopping master", e); 248 } 249 LOG.info("Stopped backup Master {} is stopped: {}", t.master.hashCode(), 250 t.master.isStopped()); 251 } else { 252 if (activeMaster != null) { 253 LOG.warn("Found more than 1 active master, hash {}", activeMaster.master.hashCode()); 254 } 255 activeMaster = t; 256 LOG.debug("Found active master hash={}, stopped={}", t.master.hashCode(), 257 t.master.isStopped()); 258 } 259 } 260 } 261 // Do active after. 262 if (activeMaster != null) { 263 try { 264 activeMaster.master.shutdown(); 265 } catch (IOException e) { 266 LOG.error("Exception occurred in HMaster.shutdown()", e); 267 } 268 } 269 } 270 boolean wasInterrupted = false; 271 final long maxTime = EnvironmentEdgeManager.currentTime() + 30 * 1000; 272 if (regionservers != null) { 273 // first try nicely. 274 for (RegionServerThread t : regionservers) { 275 t.getRegionServer().stop("Shutdown requested"); 276 } 277 for (RegionServerThread t : regionservers) { 278 long now = EnvironmentEdgeManager.currentTime(); 279 if (t.isAlive() && !wasInterrupted && now < maxTime) { 280 try { 281 t.join(maxTime - now); 282 } catch (InterruptedException e) { 283 LOG.info("Got InterruptedException on shutdown - " 284 + "not waiting anymore on region server ends", e); 285 wasInterrupted = true; // someone wants us to speed up. 286 } 287 } 288 } 289 290 // Let's try to interrupt the remaining threads if any. 291 for (int i = 0; i < 100; ++i) { 292 boolean atLeastOneLiveServer = false; 293 for (RegionServerThread t : regionservers) { 294 if (t.isAlive()) { 295 atLeastOneLiveServer = true; 296 try { 297 LOG.warn("RegionServerThreads remaining, give one more chance before interrupting"); 298 t.join(1000); 299 } catch (InterruptedException e) { 300 wasInterrupted = true; 301 } 302 } 303 } 304 if (!atLeastOneLiveServer) break; 305 for (RegionServerThread t : regionservers) { 306 if (t.isAlive()) { 307 LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump " 308 + "if > 3 attempts: i=" + i); 309 if (i > 3) { 310 Threads.printThreadInfo(System.out, "Thread dump " + t.getName()); 311 } 312 t.interrupt(); 313 } 314 } 315 } 316 } 317 318 if (masters != null) { 319 for (JVMClusterUtil.MasterThread t : masters) { 320 while (t.master.isAlive() && !wasInterrupted) { 321 try { 322 // The below has been replaced to debug sometime hangs on end of 323 // tests. 324 // this.master.join(): 325 Threads.threadDumpingIsAlive(t.master); 326 } catch (InterruptedException e) { 327 LOG.info( 328 "Got InterruptedException on shutdown - " + "not waiting anymore on master ends", e); 329 wasInterrupted = true; 330 } 331 } 332 } 333 } 334 LOG.info("Shutdown of " + ((masters != null) ? masters.size() : "0") + " master(s) and " 335 + ((regionservers != null) ? regionservers.size() : "0") + " regionserver(s) " 336 + (wasInterrupted ? "interrupted" : "complete")); 337 338 if (wasInterrupted) { 339 Thread.currentThread().interrupt(); 340 } 341 } 342}