001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.lang.reflect.Constructor;
023import java.lang.reflect.InvocationTargetException;
024import java.util.List;
025import java.util.concurrent.TimeUnit;
026import java.util.function.Supplier;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.master.HMaster;
030import org.apache.hadoop.hbase.regionserver.HRegionServer;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
036
037/**
038 * Utility used running a cluster all in the one JVM.
039 */
040@InterfaceAudience.Private
041public class JVMClusterUtil {
042  private static final Logger LOG = LoggerFactory.getLogger(JVMClusterUtil.class);
043
044  /**
045   * Datastructure to hold RegionServer Thread and RegionServer instance
046   */
047  public static class RegionServerThread extends Thread {
048    private final HRegionServer regionServer;
049
050    public RegionServerThread(final HRegionServer r, final int index) {
051      super(r, "RS:" + index + ";" + r.getServerName().toShortString());
052      this.regionServer = r;
053    }
054
055    /** Returns the region server */
056    public HRegionServer getRegionServer() {
057      return this.regionServer;
058    }
059
060    /**
061     * Block until the region server has come online, indicating it is ready to be used.
062     */
063    public void waitForServerOnline() {
064      // The server is marked online after the init method completes inside of
065      // the HRS#run method. HRS#init can fail for whatever region. In those
066      // cases, we'll jump out of the run without setting online flag. Check
067      // stopRequested so we don't wait here a flag that will never be flipped.
068      regionServer.waitForServerOnline();
069    }
070  }
071
072  /**
073   * Creates a {@link RegionServerThread}. Call 'start' on the returned thread to make it run.
074   * @param c     Configuration to use.
075   * @param hrsc  Class to create.
076   * @param index Used distinguishing the object returned. n * @return Region server added.
077   */
078  public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c,
079    final Class<? extends HRegionServer> hrsc, final int index) throws IOException {
080    HRegionServer server;
081    try {
082      Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
083      ctor.setAccessible(true);
084      server = ctor.newInstance(c);
085    } catch (InvocationTargetException ite) {
086      Throwable target = ite.getTargetException();
087      throw new RuntimeException("Failed construction of RegionServer: " + hrsc.toString()
088        + ((target.getCause() != null) ? target.getCause().getMessage() : ""), target);
089    } catch (Exception e) {
090      throw new IOException(e);
091    }
092    return new JVMClusterUtil.RegionServerThread(server, index);
093  }
094
095  /**
096   * Datastructure to hold Master Thread and Master instance
097   */
098  public static class MasterThread extends Thread {
099    private final HMaster master;
100
101    public MasterThread(final HMaster m, final int index) {
102      super(m, "M:" + index + ";" + m.getServerName().toShortString());
103      this.master = m;
104    }
105
106    /** Returns the master */
107    public HMaster getMaster() {
108      return this.master;
109    }
110  }
111
112  /**
113   * Creates a {@link MasterThread}. Call 'start' on the returned thread to make it run.
114   * @param c     Configuration to use.
115   * @param hmc   Class to create.
116   * @param index Used distinguishing the object returned. n * @return Master added.
117   */
118  public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
119    final Class<? extends HMaster> hmc, final int index) throws IOException {
120    HMaster server;
121    try {
122      server = hmc.getConstructor(Configuration.class).newInstance(c);
123    } catch (InvocationTargetException ite) {
124      Throwable target = ite.getTargetException();
125      throw new RuntimeException("Failed construction of Master: " + hmc.toString()
126        + ((target.getCause() != null) ? target.getCause().getMessage() : ""), target);
127    } catch (Exception e) {
128      throw new IOException(e);
129    }
130    // Needed if a master based registry is configured for internal cluster connections. Here, we
131    // just add the current master host port since we do not know other master addresses up front
132    // in mini cluster tests.
133    c.set(HConstants.MASTER_ADDRS_KEY,
134      Preconditions.checkNotNull(server.getServerName().getAddress()).toString());
135    return new JVMClusterUtil.MasterThread(server, index);
136  }
137
138  private static JVMClusterUtil.MasterThread
139    findActiveMaster(List<JVMClusterUtil.MasterThread> masters) {
140    for (JVMClusterUtil.MasterThread t : masters) {
141      if (t.master.isActiveMaster()) {
142        return t;
143      }
144    }
145
146    return null;
147  }
148
149  /**
150   * Start the cluster. Waits until there is a primary master initialized and returns its address.
151   * nn * @return Address to use contacting primary master.
152   */
153  public static String startup(final List<JVMClusterUtil.MasterThread> masters,
154    final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
155    // Implementation note: This method relies on timed sleeps in a loop. It's not great, and
156    // should probably be re-written to use actual synchronization objects, but it's ok for now
157
158    Configuration configuration = null;
159
160    if (masters == null || masters.isEmpty()) {
161      return null;
162    }
163
164    for (JVMClusterUtil.MasterThread t : masters) {
165      configuration = t.getMaster().getConfiguration();
166      t.start();
167    }
168
169    // Wait for an active master
170    // having an active master before starting the region threads allows
171    // then to succeed on their connection to master
172    final int startTimeout = configuration != null
173      ? Integer.parseInt(configuration.get("hbase.master.start.timeout.localHBaseCluster", "30000"))
174      : 30000;
175    waitForEvent(startTimeout, "active", () -> findActiveMaster(masters) != null);
176
177    if (regionservers != null) {
178      for (JVMClusterUtil.RegionServerThread t : regionservers) {
179        t.start();
180      }
181    }
182
183    // Wait for an active master to be initialized (implies being master)
184    // with this, when we return the cluster is complete
185    final int initTimeout = configuration != null
186      ? Integer.parseInt(configuration.get("hbase.master.init.timeout.localHBaseCluster", "200000"))
187      : 200000;
188    waitForEvent(initTimeout, "initialized", () -> {
189      JVMClusterUtil.MasterThread t = findActiveMaster(masters);
190      // master thread should never be null at this point, but let's keep the check anyway
191      return t != null && t.master.isInitialized();
192    });
193
194    return findActiveMaster(masters).master.getServerName().toString();
195  }
196
197  /**
198   * Utility method to wait some time for an event to occur, and then return control to the caller.
199   * @param millis How long to wait, in milliseconds.
200   * @param action The action that we are waiting for. Will be used in log message if the event does
201   *               not occur.
202   * @param check  A Supplier that will be checked periodically to produce an updated true/false
203   *               result indicating if the expected event has happened or not.
204   * @throws InterruptedIOException If we are interrupted while waiting for the event.
205   * @throws RuntimeException       If we reach the specified timeout while waiting for the event.
206   */
207  private static void waitForEvent(long millis, String action, Supplier<Boolean> check)
208    throws InterruptedIOException {
209    long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis);
210
211    while (true) {
212      if (check.get()) {
213        return;
214      }
215
216      if (System.nanoTime() > end) {
217        String msg = "Master not " + action + " after " + millis + "ms";
218        Threads.printThreadInfo(System.out, "Thread dump because: " + msg);
219        throw new RuntimeException(msg);
220      }
221
222      try {
223        Thread.sleep(100);
224      } catch (InterruptedException e) {
225        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
226      }
227    }
228
229  }
230
231  /**
232   * nn
233   */
234  public static void shutdown(final List<MasterThread> masters,
235    final List<RegionServerThread> regionservers) {
236    LOG.debug("Shutting down HBase Cluster");
237    if (masters != null) {
238      // Do backups first.
239      JVMClusterUtil.MasterThread activeMaster = null;
240      for (JVMClusterUtil.MasterThread t : masters) {
241        // Master was killed but could be still considered as active. Check first if it is stopped.
242        if (!t.master.isStopped()) {
243          if (!t.master.isActiveMaster()) {
244            try {
245              t.master.stopMaster();
246            } catch (IOException e) {
247              LOG.error("Exception occurred while stopping master", e);
248            }
249            LOG.info("Stopped backup Master {} is stopped: {}", t.master.hashCode(),
250              t.master.isStopped());
251          } else {
252            if (activeMaster != null) {
253              LOG.warn("Found more than 1 active master, hash {}", activeMaster.master.hashCode());
254            }
255            activeMaster = t;
256            LOG.debug("Found active master hash={}, stopped={}", t.master.hashCode(),
257              t.master.isStopped());
258          }
259        }
260      }
261      // Do active after.
262      if (activeMaster != null) {
263        try {
264          activeMaster.master.shutdown();
265        } catch (IOException e) {
266          LOG.error("Exception occurred in HMaster.shutdown()", e);
267        }
268      }
269    }
270    boolean wasInterrupted = false;
271    final long maxTime = EnvironmentEdgeManager.currentTime() + 30 * 1000;
272    if (regionservers != null) {
273      // first try nicely.
274      for (RegionServerThread t : regionservers) {
275        t.getRegionServer().stop("Shutdown requested");
276      }
277      for (RegionServerThread t : regionservers) {
278        long now = EnvironmentEdgeManager.currentTime();
279        if (t.isAlive() && !wasInterrupted && now < maxTime) {
280          try {
281            t.join(maxTime - now);
282          } catch (InterruptedException e) {
283            LOG.info("Got InterruptedException on shutdown - "
284              + "not waiting anymore on region server ends", e);
285            wasInterrupted = true; // someone wants us to speed up.
286          }
287        }
288      }
289
290      // Let's try to interrupt the remaining threads if any.
291      for (int i = 0; i < 100; ++i) {
292        boolean atLeastOneLiveServer = false;
293        for (RegionServerThread t : regionservers) {
294          if (t.isAlive()) {
295            atLeastOneLiveServer = true;
296            try {
297              LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
298              t.join(1000);
299            } catch (InterruptedException e) {
300              wasInterrupted = true;
301            }
302          }
303        }
304        if (!atLeastOneLiveServer) break;
305        for (RegionServerThread t : regionservers) {
306          if (t.isAlive()) {
307            LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump "
308              + "if > 3 attempts: i=" + i);
309            if (i > 3) {
310              Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
311            }
312            t.interrupt();
313          }
314        }
315      }
316    }
317
318    if (masters != null) {
319      for (JVMClusterUtil.MasterThread t : masters) {
320        while (t.master.isAlive() && !wasInterrupted) {
321          try {
322            // The below has been replaced to debug sometime hangs on end of
323            // tests.
324            // this.master.join():
325            Threads.threadDumpingIsAlive(t.master);
326          } catch (InterruptedException e) {
327            LOG.info(
328              "Got InterruptedException on shutdown - " + "not waiting anymore on master ends", e);
329            wasInterrupted = true;
330          }
331        }
332      }
333    }
334    LOG.info("Shutdown of " + ((masters != null) ? masters.size() : "0") + " master(s) and "
335      + ((regionservers != null) ? regionservers.size() : "0") + " regionserver(s) "
336      + (wasInterrupted ? "interrupted" : "complete"));
337
338    if (wasInterrupted) {
339      Thread.currentThread().interrupt();
340    }
341  }
342}