001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.InterruptedIOException;
022import java.io.IOException;
023import java.lang.reflect.Constructor;
024import java.lang.reflect.InvocationTargetException;
025import java.util.List;
026import java.util.concurrent.TimeUnit;
027import java.util.function.Supplier;
028
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.master.HMaster;
035import org.apache.hadoop.hbase.regionserver.HRegionServer;
036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
037
038/**
039 * Utility used running a cluster all in the one JVM.
040 */
041@InterfaceAudience.Private
042public class JVMClusterUtil {
043  private static final Logger LOG = LoggerFactory.getLogger(JVMClusterUtil.class);
044
045  /**
046   * Datastructure to hold RegionServer Thread and RegionServer instance
047   */
048  public static class RegionServerThread extends Thread {
049    private final HRegionServer regionServer;
050
051    public RegionServerThread(final HRegionServer r, final int index) {
052      super(r, "RS:" + index + ";" + r.getServerName().toShortString());
053      this.regionServer = r;
054    }
055
056    /** @return the region server */
057    public HRegionServer getRegionServer() {
058      return this.regionServer;
059    }
060
061    /**
062     * Block until the region server has come online, indicating it is ready
063     * to be used.
064     */
065    public void waitForServerOnline() {
066      // The server is marked online after the init method completes inside of
067      // the HRS#run method.  HRS#init can fail for whatever region.  In those
068      // cases, we'll jump out of the run without setting online flag.  Check
069      // stopRequested so we don't wait here a flag that will never be flipped.
070      regionServer.waitForServerOnline();
071    }
072  }
073
074  /**
075   * Creates a {@link RegionServerThread}.
076   * Call 'start' on the returned thread to make it run.
077   * @param c Configuration to use.
078   * @param hrsc Class to create.
079   * @param index Used distinguishing the object returned.
080   * @throws IOException
081   * @return Region server added.
082   */
083  public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c,
084      final Class<? extends HRegionServer> hrsc, final int index) throws IOException {
085    HRegionServer server;
086    try {
087      Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
088      ctor.setAccessible(true);
089      server = ctor.newInstance(c);
090    } catch (InvocationTargetException ite) {
091      Throwable target = ite.getTargetException();
092      throw new RuntimeException("Failed construction of RegionServer: " +
093        hrsc.toString() + ((target.getCause() != null)?
094          target.getCause().getMessage(): ""), target);
095    } catch (Exception e) {
096      throw new IOException(e);
097    }
098    return new JVMClusterUtil.RegionServerThread(server, index);
099  }
100
101
102  /**
103   * Datastructure to hold Master Thread and Master instance
104   */
105  public static class MasterThread extends Thread {
106    private final HMaster master;
107
108    public MasterThread(final HMaster m, final int index) {
109      super(m, "M:" + index + ";" + m.getServerName().toShortString());
110      this.master = m;
111    }
112
113    /** @return the master */
114    public HMaster getMaster() {
115      return this.master;
116    }
117  }
118
119  /**
120   * Creates a {@link MasterThread}.
121   * Call 'start' on the returned thread to make it run.
122   * @param c Configuration to use.
123   * @param hmc Class to create.
124   * @param index Used distinguishing the object returned.
125   * @throws IOException
126   * @return Master added.
127   */
128  public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
129      final Class<? extends HMaster> hmc, final int index) throws IOException {
130    HMaster server;
131    try {
132      server = hmc.getConstructor(Configuration.class).newInstance(c);
133    } catch (InvocationTargetException ite) {
134      Throwable target = ite.getTargetException();
135      throw new RuntimeException("Failed construction of Master: " +
136        hmc.toString() + ((target.getCause() != null)?
137          target.getCause().getMessage(): ""), target);
138    } catch (Exception e) {
139      throw new IOException(e);
140    }
141    // Needed if a master based registry is configured for internal cluster connections. Here, we
142    // just add the current master host port since we do not know other master addresses up front
143    // in mini cluster tests.
144    c.set(HConstants.MASTER_ADDRS_KEY,
145        Preconditions.checkNotNull(server.getServerName().getAddress()).toString());
146    return new JVMClusterUtil.MasterThread(server, index);
147  }
148
149  private static JVMClusterUtil.MasterThread findActiveMaster(
150    List<JVMClusterUtil.MasterThread> masters) {
151    for (JVMClusterUtil.MasterThread t : masters) {
152      if (t.master.isActiveMaster()) {
153        return t;
154      }
155    }
156
157    return null;
158  }
159
160  /**
161   * Start the cluster.  Waits until there is a primary master initialized
162   * and returns its address.
163   * @param masters
164   * @param regionservers
165   * @return Address to use contacting primary master.
166   */
167  public static String startup(final List<JVMClusterUtil.MasterThread> masters,
168      final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
169    // Implementation note: This method relies on timed sleeps in a loop. It's not great, and
170    // should probably be re-written to use actual synchronization objects, but it's ok for now
171
172    Configuration configuration = null;
173
174    if (masters == null || masters.isEmpty()) {
175      return null;
176    }
177
178    for (JVMClusterUtil.MasterThread t : masters) {
179      configuration = t.getMaster().getConfiguration();
180      t.start();
181    }
182
183    // Wait for an active master
184    //  having an active master before starting the region threads allows
185    //  then to succeed on their connection to master
186    final int startTimeout = configuration != null ? Integer.parseInt(
187        configuration.get("hbase.master.start.timeout.localHBaseCluster", "30000")) : 30000;
188    waitForEvent(startTimeout, "active", () -> findActiveMaster(masters) != null);
189
190    if (regionservers != null) {
191      for (JVMClusterUtil.RegionServerThread t: regionservers) {
192        t.start();
193      }
194    }
195
196    // Wait for an active master to be initialized (implies being master)
197    //  with this, when we return the cluster is complete
198    final int initTimeout = configuration != null ? Integer.parseInt(
199        configuration.get("hbase.master.init.timeout.localHBaseCluster", "200000")) : 200000;
200    waitForEvent(initTimeout, "initialized", () -> {
201        JVMClusterUtil.MasterThread t = findActiveMaster(masters);
202        // master thread should never be null at this point, but let's keep the check anyway
203        return t != null && t.master.isInitialized();
204      }
205    );
206
207    return findActiveMaster(masters).master.getServerName().toString();
208  }
209
210  /**
211   * Utility method to wait some time for an event to occur, and then return control to the caller.
212   * @param millis How long to wait, in milliseconds.
213   * @param action The action that we are waiting for. Will be used in log message if the event
214   *               does not occur.
215   * @param check A Supplier that will be checked periodically to produce an updated true/false
216   *              result indicating if the expected event has happened or not.
217   * @throws InterruptedIOException If we are interrupted while waiting for the event.
218   * @throws RuntimeException If we reach the specified timeout while waiting for the event.
219   */
220  private static void waitForEvent(long millis, String action, Supplier<Boolean> check)
221      throws InterruptedIOException {
222    long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis);
223
224    while (true) {
225      if (check.get()) {
226        return;
227      }
228
229      if (System.nanoTime() > end) {
230        String msg = "Master not " + action + " after " + millis + "ms";
231        Threads.printThreadInfo(System.out, "Thread dump because: " + msg);
232        throw new RuntimeException(msg);
233      }
234
235      try {
236        Thread.sleep(100);
237      } catch (InterruptedException e) {
238        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
239      }
240    }
241
242  }
243
244  /**
245   * @param masters
246   * @param regionservers
247   */
248  public static void shutdown(final List<MasterThread> masters,
249      final List<RegionServerThread> regionservers) {
250    LOG.debug("Shutting down HBase Cluster");
251    if (masters != null) {
252      // Do backups first.
253      JVMClusterUtil.MasterThread activeMaster = null;
254      for (JVMClusterUtil.MasterThread t : masters) {
255        // Master was killed but could be still considered as active. Check first if it is stopped.
256        if (!t.master.isStopped()) {
257          if (!t.master.isActiveMaster()) {
258            try {
259              t.master.stopMaster();
260            } catch (IOException e) {
261              LOG.error("Exception occurred while stopping master", e);
262            }
263            LOG.info("Stopped backup Master {} is stopped: {}",
264                t.master.hashCode(), t.master.isStopped());
265          } else {
266            if (activeMaster != null) {
267              LOG.warn("Found more than 1 active master, hash {}", activeMaster.master.hashCode());
268            }
269            activeMaster = t;
270            LOG.debug("Found active master hash={}, stopped={}",
271                t.master.hashCode(), t.master.isStopped());
272          }
273        }
274      }
275      // Do active after.
276      if (activeMaster != null) {
277        try {
278          activeMaster.master.shutdown();
279        } catch (IOException e) {
280          LOG.error("Exception occurred in HMaster.shutdown()", e);
281        }
282      }
283    }
284    boolean wasInterrupted = false;
285    final long maxTime = System.currentTimeMillis() + 30 * 1000;
286    if (regionservers != null) {
287      // first try nicely.
288      for (RegionServerThread t : regionservers) {
289        t.getRegionServer().stop("Shutdown requested");
290      }
291      for (RegionServerThread t : regionservers) {
292        long now = System.currentTimeMillis();
293        if (t.isAlive() && !wasInterrupted && now < maxTime) {
294          try {
295            t.join(maxTime - now);
296          } catch (InterruptedException e) {
297            LOG.info("Got InterruptedException on shutdown - " +
298                "not waiting anymore on region server ends", e);
299            wasInterrupted = true; // someone wants us to speed up.
300          }
301        }
302      }
303
304      // Let's try to interrupt the remaining threads if any.
305      for (int i = 0; i < 100; ++i) {
306        boolean atLeastOneLiveServer = false;
307        for (RegionServerThread t : regionservers) {
308          if (t.isAlive()) {
309            atLeastOneLiveServer = true;
310            try {
311              LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
312              t.join(1000);
313            } catch (InterruptedException e) {
314              wasInterrupted = true;
315            }
316          }
317        }
318        if (!atLeastOneLiveServer) break;
319        for (RegionServerThread t : regionservers) {
320          if (t.isAlive()) {
321            LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump "  +
322              "if > 3 attempts: i=" + i);
323            if (i > 3) {
324              Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
325            }
326            t.interrupt();
327          }
328        }
329      }
330    }
331
332    if (masters != null) {
333      for (JVMClusterUtil.MasterThread t : masters) {
334        while (t.master.isAlive() && !wasInterrupted) {
335          try {
336            // The below has been replaced to debug sometime hangs on end of
337            // tests.
338            // this.master.join():
339            Threads.threadDumpingIsAlive(t.master);
340          } catch(InterruptedException e) {
341            LOG.info("Got InterruptedException on shutdown - " +
342                "not waiting anymore on master ends", e);
343            wasInterrupted = true;
344          }
345        }
346      }
347    }
348    LOG.info("Shutdown of " +
349      ((masters != null) ? masters.size() : "0") + " master(s) and " +
350      ((regionservers != null) ? regionservers.size() : "0") +
351      " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete"));
352
353    if (wasInterrupted){
354      Thread.currentThread().interrupt();
355    }
356  }
357}