001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.InterruptedIOException;
022import java.io.IOException;
023import java.lang.reflect.Constructor;
024import java.lang.reflect.InvocationTargetException;
025import java.util.List;
026import java.util.concurrent.TimeUnit;
027import java.util.function.Supplier;
028
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.master.HMaster;
034import org.apache.hadoop.hbase.regionserver.HRegionServer;
035
036/**
037 * Utility used running a cluster all in the one JVM.
038 */
039@InterfaceAudience.Private
040public class JVMClusterUtil {
041  private static final Logger LOG = LoggerFactory.getLogger(JVMClusterUtil.class);
042
043  /**
044   * Datastructure to hold RegionServer Thread and RegionServer instance
045   */
046  public static class RegionServerThread extends Thread {
047    private final HRegionServer regionServer;
048
049    public RegionServerThread(final HRegionServer r, final int index) {
050      super(r, "RS:" + index + ";" + r.getServerName().toShortString());
051      this.regionServer = r;
052    }
053
054    /** @return the region server */
055    public HRegionServer getRegionServer() {
056      return this.regionServer;
057    }
058
059    /**
060     * Block until the region server has come online, indicating it is ready
061     * to be used.
062     */
063    public void waitForServerOnline() {
064      // The server is marked online after the init method completes inside of
065      // the HRS#run method.  HRS#init can fail for whatever region.  In those
066      // cases, we'll jump out of the run without setting online flag.  Check
067      // stopRequested so we don't wait here a flag that will never be flipped.
068      regionServer.waitForServerOnline();
069    }
070  }
071
072  /**
073   * Creates a {@link RegionServerThread}.
074   * Call 'start' on the returned thread to make it run.
075   * @param c Configuration to use.
076   * @param hrsc Class to create.
077   * @param index Used distinguishing the object returned.
078   * @throws IOException
079   * @return Region server added.
080   */
081  public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c,
082      final Class<? extends HRegionServer> hrsc, final int index) throws IOException {
083    HRegionServer server;
084    try {
085      Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
086      ctor.setAccessible(true);
087      server = ctor.newInstance(c);
088    } catch (InvocationTargetException ite) {
089      Throwable target = ite.getTargetException();
090      throw new RuntimeException("Failed construction of RegionServer: " +
091        hrsc.toString() + ((target.getCause() != null)?
092          target.getCause().getMessage(): ""), target);
093    } catch (Exception e) {
094      throw new IOException(e);
095    }
096    return new JVMClusterUtil.RegionServerThread(server, index);
097  }
098
099
100  /**
101   * Datastructure to hold Master Thread and Master instance
102   */
103  public static class MasterThread extends Thread {
104    private final HMaster master;
105
106    public MasterThread(final HMaster m, final int index) {
107      super(m, "M:" + index + ";" + m.getServerName().toShortString());
108      this.master = m;
109    }
110
111    /** @return the master */
112    public HMaster getMaster() {
113      return this.master;
114    }
115  }
116
117  /**
118   * Creates a {@link MasterThread}.
119   * Call 'start' on the returned thread to make it run.
120   * @param c Configuration to use.
121   * @param hmc Class to create.
122   * @param index Used distinguishing the object returned.
123   * @throws IOException
124   * @return Master added.
125   */
126  public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
127      final Class<? extends HMaster> hmc, final int index) throws IOException {
128    HMaster server;
129    try {
130      server = hmc.getConstructor(Configuration.class).newInstance(c);
131    } catch (InvocationTargetException ite) {
132      Throwable target = ite.getTargetException();
133      throw new RuntimeException("Failed construction of Master: " +
134        hmc.toString() + ((target.getCause() != null)?
135          target.getCause().getMessage(): ""), target);
136    } catch (Exception e) {
137      throw new IOException(e);
138    }
139    return new JVMClusterUtil.MasterThread(server, index);
140  }
141
142  private static JVMClusterUtil.MasterThread findActiveMaster(
143    List<JVMClusterUtil.MasterThread> masters) {
144    for (JVMClusterUtil.MasterThread t : masters) {
145      if (t.master.isActiveMaster()) {
146        return t;
147      }
148    }
149
150    return null;
151  }
152
153  /**
154   * Start the cluster.  Waits until there is a primary master initialized
155   * and returns its address.
156   * @param masters
157   * @param regionservers
158   * @return Address to use contacting primary master.
159   */
160  public static String startup(final List<JVMClusterUtil.MasterThread> masters,
161      final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
162    // Implementation note: This method relies on timed sleeps in a loop. It's not great, and
163    // should probably be re-written to use actual synchronization objects, but it's ok for now
164
165    Configuration configuration = null;
166
167    if (masters == null || masters.isEmpty()) {
168      return null;
169    }
170
171    for (JVMClusterUtil.MasterThread t : masters) {
172      configuration = t.getMaster().getConfiguration();
173      t.start();
174    }
175
176    // Wait for an active master
177    //  having an active master before starting the region threads allows
178    //  then to succeed on their connection to master
179    final int startTimeout = configuration != null ? Integer.parseInt(
180        configuration.get("hbase.master.start.timeout.localHBaseCluster", "30000")) : 30000;
181    waitForEvent(startTimeout, "active", () -> findActiveMaster(masters) != null);
182
183    if (regionservers != null) {
184      for (JVMClusterUtil.RegionServerThread t: regionservers) {
185        t.start();
186      }
187    }
188
189    // Wait for an active master to be initialized (implies being master)
190    //  with this, when we return the cluster is complete
191    final int initTimeout = configuration != null ? Integer.parseInt(
192        configuration.get("hbase.master.init.timeout.localHBaseCluster", "200000")) : 200000;
193    waitForEvent(initTimeout, "initialized", () -> {
194        JVMClusterUtil.MasterThread t = findActiveMaster(masters);
195        // master thread should never be null at this point, but let's keep the check anyway
196        return t != null && t.master.isInitialized();
197      }
198    );
199
200    return findActiveMaster(masters).master.getServerName().toString();
201  }
202
203  /**
204   * Utility method to wait some time for an event to occur, and then return control to the caller.
205   * @param millis How long to wait, in milliseconds.
206   * @param action The action that we are waiting for. Will be used in log message if the event
207   *               does not occur.
208   * @param check A Supplier that will be checked periodically to produce an updated true/false
209   *              result indicating if the expected event has happened or not.
210   * @throws InterruptedIOException If we are interrupted while waiting for the event.
211   * @throws RuntimeException If we reach the specified timeout while waiting for the event.
212   */
213  private static void waitForEvent(long millis, String action, Supplier<Boolean> check)
214      throws InterruptedIOException {
215    long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis);
216
217    while (true) {
218      if (check.get()) {
219        return;
220      }
221
222      if (System.nanoTime() > end) {
223        String msg = "Master not " + action + " after " + millis + "ms";
224        Threads.printThreadInfo(System.out, "Thread dump because: " + msg);
225        throw new RuntimeException(msg);
226      }
227
228      try {
229        Thread.sleep(100);
230      } catch (InterruptedException e) {
231        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
232      }
233    }
234
235  }
236
237  /**
238   * @param masters
239   * @param regionservers
240   */
241  public static void shutdown(final List<MasterThread> masters,
242      final List<RegionServerThread> regionservers) {
243    LOG.debug("Shutting down HBase Cluster");
244    if (masters != null) {
245      // Do backups first.
246      JVMClusterUtil.MasterThread activeMaster = null;
247      for (JVMClusterUtil.MasterThread t : masters) {
248        // Master was killed but could be still considered as active. Check first if it is stopped.
249        if (!t.master.isStopped()) {
250          if (!t.master.isActiveMaster()) {
251            try {
252              t.master.stopMaster();
253            } catch (IOException e) {
254              LOG.error("Exception occurred while stopping master", e);
255            }
256            LOG.info("Stopped backup Master {} is stopped: {}",
257                t.master.hashCode(), t.master.isStopped());
258          } else {
259            if (activeMaster != null) {
260              LOG.warn("Found more than 1 active master, hash {}", activeMaster.master.hashCode());
261            }
262            activeMaster = t;
263            LOG.debug("Found active master hash={}, stopped={}",
264                t.master.hashCode(), t.master.isStopped());
265          }
266        }
267      }
268      // Do active after.
269      if (activeMaster != null) {
270        try {
271          activeMaster.master.shutdown();
272        } catch (IOException e) {
273          LOG.error("Exception occurred in HMaster.shutdown()", e);
274        }
275      }
276    }
277    boolean wasInterrupted = false;
278    final long maxTime = System.currentTimeMillis() + 30 * 1000;
279    if (regionservers != null) {
280      // first try nicely.
281      for (RegionServerThread t : regionservers) {
282        t.getRegionServer().stop("Shutdown requested");
283      }
284      for (RegionServerThread t : regionservers) {
285        long now = System.currentTimeMillis();
286        if (t.isAlive() && !wasInterrupted && now < maxTime) {
287          try {
288            t.join(maxTime - now);
289          } catch (InterruptedException e) {
290            LOG.info("Got InterruptedException on shutdown - " +
291                "not waiting anymore on region server ends", e);
292            wasInterrupted = true; // someone wants us to speed up.
293          }
294        }
295      }
296
297      // Let's try to interrupt the remaining threads if any.
298      for (int i = 0; i < 100; ++i) {
299        boolean atLeastOneLiveServer = false;
300        for (RegionServerThread t : regionservers) {
301          if (t.isAlive()) {
302            atLeastOneLiveServer = true;
303            try {
304              LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
305              t.join(1000);
306            } catch (InterruptedException e) {
307              wasInterrupted = true;
308            }
309          }
310        }
311        if (!atLeastOneLiveServer) break;
312        for (RegionServerThread t : regionservers) {
313          if (t.isAlive()) {
314            LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump "  +
315              "if > 3 attempts: i=" + i);
316            if (i > 3) {
317              Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
318            }
319            t.interrupt();
320          }
321        }
322      }
323    }
324
325    if (masters != null) {
326      for (JVMClusterUtil.MasterThread t : masters) {
327        while (t.master.isAlive() && !wasInterrupted) {
328          try {
329            // The below has been replaced to debug sometime hangs on end of
330            // tests.
331            // this.master.join():
332            Threads.threadDumpingIsAlive(t.master.getThread());
333          } catch(InterruptedException e) {
334            LOG.info("Got InterruptedException on shutdown - " +
335                "not waiting anymore on master ends", e);
336            wasInterrupted = true;
337          }
338        }
339      }
340    }
341    LOG.info("Shutdown of " +
342      ((masters != null) ? masters.size() : "0") + " master(s) and " +
343      ((regionservers != null) ? regionservers.size() : "0") +
344      " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete"));
345
346    if (wasInterrupted){
347      Thread.currentThread().interrupt();
348    }
349  }
350}