001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.lang.reflect.Constructor;
023import java.lang.reflect.InvocationTargetException;
024import java.util.List;
025import java.util.concurrent.TimeUnit;
026import java.util.function.Supplier;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.master.HMaster;
030import org.apache.hadoop.hbase.regionserver.HRegionServer;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
036
037/**
038 * Utility used running a cluster all in the one JVM.
039 */
040@InterfaceAudience.Private
041public class JVMClusterUtil {
042  private static final Logger LOG = LoggerFactory.getLogger(JVMClusterUtil.class);
043
044  /**
045   * Datastructure to hold RegionServer Thread and RegionServer instance
046   */
047  public static class RegionServerThread extends Thread {
048    private final HRegionServer regionServer;
049
050    public RegionServerThread(final HRegionServer r, final int index) {
051      super(r, "RS:" + index + ";" + r.getServerName().toShortString());
052      this.regionServer = r;
053    }
054
055    /** Returns the region server */
056    public HRegionServer getRegionServer() {
057      return this.regionServer;
058    }
059
060    /**
061     * Block until the region server has come online, indicating it is ready to be used.
062     */
063    public void waitForServerOnline() {
064      // The server is marked online after the init method completes inside of
065      // the HRS#run method. HRS#init can fail for whatever region. In those
066      // cases, we'll jump out of the run without setting online flag. Check
067      // stopRequested so we don't wait here a flag that will never be flipped.
068      regionServer.waitForServerOnline();
069    }
070  }
071
072  /**
073   * Creates a {@link RegionServerThread}. Call 'start' on the returned thread to make it run.
074   * @param c     Configuration to use.
075   * @param hrsc  Class to create.
076   * @param index Used distinguishing the object returned.
077   * @return Region server added.
078   */
079  public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c,
080    final Class<? extends HRegionServer> hrsc, final int index) throws IOException {
081    HRegionServer server;
082    try {
083      Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
084      ctor.setAccessible(true);
085      server = ctor.newInstance(c);
086    } catch (InvocationTargetException ite) {
087      Throwable target = ite.getTargetException();
088      throw new RuntimeException("Failed construction of RegionServer: " + hrsc.toString()
089        + ((target.getCause() != null) ? target.getCause().getMessage() : ""), target);
090    } catch (Exception e) {
091      throw new IOException(e);
092    }
093    return new JVMClusterUtil.RegionServerThread(server, index);
094  }
095
096  /**
097   * Datastructure to hold Master Thread and Master instance
098   */
099  public static class MasterThread extends Thread {
100    private final HMaster master;
101
102    public MasterThread(final HMaster m, final int index) {
103      super(m, "M:" + index + ";" + m.getServerName().toShortString());
104      this.master = m;
105    }
106
107    /** Returns the master */
108    public HMaster getMaster() {
109      return this.master;
110    }
111  }
112
113  /**
114   * Creates a {@link MasterThread}. Call 'start' on the returned thread to make it run.
115   * @param c     Configuration to use.
116   * @param hmc   Class to create.
117   * @param index Used distinguishing the object returned.
118   * @return Master added.
119   */
120  public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
121    final Class<? extends HMaster> hmc, final int index) throws IOException {
122    HMaster server;
123    try {
124      server = hmc.getConstructor(Configuration.class).newInstance(c);
125    } catch (InvocationTargetException ite) {
126      Throwable target = ite.getTargetException();
127      throw new RuntimeException("Failed construction of Master: " + hmc.toString()
128        + ((target.getCause() != null) ? target.getCause().getMessage() : ""), target);
129    } catch (Exception e) {
130      throw new IOException(e);
131    }
132    // Needed if a master based registry is configured for internal cluster connections. Here, we
133    // just add the current master host port since we do not know other master addresses up front
134    // in mini cluster tests.
135    c.set(HConstants.MASTER_ADDRS_KEY,
136      Preconditions.checkNotNull(server.getServerName().getAddress()).toString());
137    return new JVMClusterUtil.MasterThread(server, index);
138  }
139
140  private static JVMClusterUtil.MasterThread
141    findActiveMaster(List<JVMClusterUtil.MasterThread> masters) {
142    for (JVMClusterUtil.MasterThread t : masters) {
143      if (t.master.isActiveMaster()) {
144        return t;
145      }
146    }
147
148    return null;
149  }
150
151  /**
152   * Start the cluster. Waits until there is a primary master initialized and returns its address.
153   * @return Address to use contacting primary master.
154   */
155  public static String startup(final List<JVMClusterUtil.MasterThread> masters,
156    final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
157    // Implementation note: This method relies on timed sleeps in a loop. It's not great, and
158    // should probably be re-written to use actual synchronization objects, but it's ok for now
159
160    Configuration configuration = null;
161
162    if (masters == null || masters.isEmpty()) {
163      return null;
164    }
165
166    for (JVMClusterUtil.MasterThread t : masters) {
167      configuration = t.getMaster().getConfiguration();
168      t.start();
169    }
170
171    // Wait for an active master
172    // having an active master before starting the region threads allows
173    // then to succeed on their connection to master
174    final int startTimeout = configuration != null
175      ? Integer.parseInt(configuration.get("hbase.master.start.timeout.localHBaseCluster", "30000"))
176      : 30000;
177    waitForEvent(startTimeout, "active", () -> findActiveMaster(masters) != null);
178
179    if (regionservers != null) {
180      for (JVMClusterUtil.RegionServerThread t : regionservers) {
181        t.start();
182      }
183    }
184
185    // Wait for an active master to be initialized (implies being master)
186    // with this, when we return the cluster is complete
187    final int initTimeout = configuration != null
188      ? Integer.parseInt(configuration.get("hbase.master.init.timeout.localHBaseCluster", "200000"))
189      : 200000;
190    waitForEvent(initTimeout, "initialized", () -> {
191      JVMClusterUtil.MasterThread t = findActiveMaster(masters);
192      // master thread should never be null at this point, but let's keep the check anyway
193      return t != null && t.master.isInitialized();
194    });
195
196    return findActiveMaster(masters).master.getServerName().toString();
197  }
198
199  /**
200   * Utility method to wait some time for an event to occur, and then return control to the caller.
201   * @param millis How long to wait, in milliseconds.
202   * @param action The action that we are waiting for. Will be used in log message if the event does
203   *               not occur.
204   * @param check  A Supplier that will be checked periodically to produce an updated true/false
205   *               result indicating if the expected event has happened or not.
206   * @throws InterruptedIOException If we are interrupted while waiting for the event.
207   * @throws RuntimeException       If we reach the specified timeout while waiting for the event.
208   */
209  private static void waitForEvent(long millis, String action, Supplier<Boolean> check)
210    throws InterruptedIOException {
211    long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis);
212
213    while (true) {
214      if (check.get()) {
215        return;
216      }
217
218      if (System.nanoTime() > end) {
219        String msg = "Master not " + action + " after " + millis + "ms";
220        Threads.printThreadInfo(System.out, "Thread dump because: " + msg);
221        throw new RuntimeException(msg);
222      }
223
224      try {
225        Thread.sleep(100);
226      } catch (InterruptedException e) {
227        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
228      }
229    }
230
231  }
232
233  /**
234   *   */
235  public static void shutdown(final List<MasterThread> masters,
236    final List<RegionServerThread> regionservers) {
237    LOG.debug("Shutting down HBase Cluster");
238    if (masters != null) {
239      // Do backups first.
240      JVMClusterUtil.MasterThread activeMaster = null;
241      for (JVMClusterUtil.MasterThread t : masters) {
242        // Master was killed but could be still considered as active. Check first if it is stopped.
243        if (!t.master.isStopped()) {
244          if (!t.master.isActiveMaster()) {
245            try {
246              t.master.stopMaster();
247            } catch (IOException e) {
248              LOG.error("Exception occurred while stopping master", e);
249            }
250            LOG.info("Stopped backup Master {} is stopped: {}", t.master.hashCode(),
251              t.master.isStopped());
252          } else {
253            if (activeMaster != null) {
254              LOG.warn("Found more than 1 active master, hash {}", activeMaster.master.hashCode());
255            }
256            activeMaster = t;
257            LOG.debug("Found active master hash={}, stopped={}", t.master.hashCode(),
258              t.master.isStopped());
259          }
260        }
261      }
262      // Do active after.
263      if (activeMaster != null) {
264        try {
265          activeMaster.master.shutdown();
266        } catch (IOException e) {
267          LOG.error("Exception occurred in HMaster.shutdown()", e);
268        }
269      }
270    }
271    boolean wasInterrupted = false;
272    final long maxTime = EnvironmentEdgeManager.currentTime() + 30 * 1000;
273    if (regionservers != null) {
274      // first try nicely.
275      for (RegionServerThread t : regionservers) {
276        t.getRegionServer().stop("Shutdown requested");
277      }
278      for (RegionServerThread t : regionservers) {
279        long now = EnvironmentEdgeManager.currentTime();
280        if (t.isAlive() && !wasInterrupted && now < maxTime) {
281          try {
282            t.join(maxTime - now);
283          } catch (InterruptedException e) {
284            LOG.info("Got InterruptedException on shutdown - "
285              + "not waiting anymore on region server ends", e);
286            wasInterrupted = true; // someone wants us to speed up.
287          }
288        }
289      }
290
291      // Let's try to interrupt the remaining threads if any.
292      for (int i = 0; i < 100; ++i) {
293        boolean atLeastOneLiveServer = false;
294        for (RegionServerThread t : regionservers) {
295          if (t.isAlive()) {
296            atLeastOneLiveServer = true;
297            try {
298              LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
299              t.join(1000);
300            } catch (InterruptedException e) {
301              wasInterrupted = true;
302            }
303          }
304        }
305        if (!atLeastOneLiveServer) break;
306        for (RegionServerThread t : regionservers) {
307          if (t.isAlive()) {
308            LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump "
309              + "if > 3 attempts: i=" + i);
310            if (i > 3) {
311              Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
312            }
313            t.interrupt();
314          }
315        }
316      }
317    }
318
319    if (masters != null) {
320      for (JVMClusterUtil.MasterThread t : masters) {
321        while (t.master.isAlive() && !wasInterrupted) {
322          try {
323            // The below has been replaced to debug sometime hangs on end of
324            // tests.
325            // this.master.join():
326            Threads.threadDumpingIsAlive(t.master);
327          } catch (InterruptedException e) {
328            LOG.info(
329              "Got InterruptedException on shutdown - " + "not waiting anymore on master ends", e);
330            wasInterrupted = true;
331          }
332        }
333      }
334    }
335    LOG.info("Shutdown of " + ((masters != null) ? masters.size() : "0") + " master(s) and "
336      + ((regionservers != null) ? regionservers.size() : "0") + " regionserver(s) "
337      + (wasInterrupted ? "interrupted" : "complete"));
338
339    if (wasInterrupted) {
340      Thread.currentThread().interrupt();
341    }
342  }
343}