001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase;
021
022import static org.junit.Assert.fail;
023
024import java.text.MessageFormat;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * A class that provides a standard waitFor pattern
033 * See details at https://issues.apache.org/jira/browse/HBASE-7384
034 */
035@InterfaceAudience.Private
036public final class Waiter {
037
038  private static final Logger LOG = LoggerFactory.getLogger(Waiter.class);
039
040  /**
041   * System property name whose value is a scale factor to increase time out values dynamically used
042   * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)},
043   * {@link #waitFor(Configuration, long, long, Predicate)}, and
044   * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method
045   * <p/>
046   * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout
047   */
048  public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio";
049
050  private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1;
051
052  private static float waitForRatio = -1;
053
054  private Waiter() {
055  }
056
057  /**
058   * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)},
059   * {@link #waitFor(Configuration, long, Predicate)},
060   * {@link #waitFor(Configuration, long, long, Predicate)} and
061   * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class
062   * <p/>
063   * This is useful to dynamically adjust max time out values when same test cases run in different
064   * test machine settings without recompiling & re-deploying code.
065   * <p/>
066   * The value is obtained from the Java System property or configuration setting
067   * <code>hbase.test.wait.for.ratio</code> which defaults to <code>1</code>.
068   * @param conf the configuration
069   * @return the 'wait for ratio' for the current test run.
070   */
071  public static float getWaitForRatio(Configuration conf) {
072    if (waitForRatio < 0) {
073      // System property takes precedence over configuration setting
074      if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) {
075        waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO));
076      } else {
077        waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT);
078      }
079    }
080    return waitForRatio;
081  }
082
083  /**
084   * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and
085   * {@link Waiter#waitFor(Configuration, long, Predicate)} and
086   * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate) methods.
087   */
088  @InterfaceAudience.Private
089  public interface Predicate<E extends Exception> {
090
091    /**
092     * Perform a predicate evaluation.
093     * @return the boolean result of the evaluation.
094     * @throws Exception thrown if the predicate evaluation could not evaluate.
095     */
096    boolean evaluate() throws E;
097
098  }
099
100  /**
101   * A mixin interface, can be used with {@link Waiter} to explain failed state.
102   */
103  @InterfaceAudience.Private
104  public interface ExplainingPredicate<E extends Exception> extends Predicate<E> {
105
106    /**
107     * Perform a predicate evaluation.
108     *
109     * @return explanation of failed state
110     */
111    String explainFailure() throws E;
112
113  }
114
115  /**
116   * Makes the current thread sleep for the duration equal to the specified time in milliseconds
117   * multiplied by the {@link #getWaitForRatio(Configuration)}.
118   * @param conf the configuration
119   * @param time the number of milliseconds to sleep.
120   */
121  public static void sleep(Configuration conf, long time) {
122    try {
123      Thread.sleep((long) (getWaitForRatio(conf) * time));
124    } catch (InterruptedException ex) {
125      LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString()));
126    }
127  }
128
129  /**
130   * Waits up to the duration equal to the specified timeout multiplied by the
131   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
132   * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
133   * <code>false</code>.
134   * <p/>
135   * @param conf the configuration
136   * @param timeout the timeout in milliseconds to wait for the predicate.
137   * @param predicate the predicate to evaluate.
138   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
139   *         wait is interrupted otherwise <code>-1</code> when times out
140   */
141  public static <E extends Exception> long waitFor(Configuration conf, long timeout,
142      Predicate<E> predicate) throws E {
143    return waitFor(conf, timeout, 100, true, predicate);
144  }
145
146  /**
147   * Waits up to the duration equal to the specified timeout multiplied by the
148   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
149   * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
150   * <code>false</code>.
151   * <p/>
152   * @param conf the configuration
153   * @param timeout the max timeout in milliseconds to wait for the predicate.
154   * @param interval the interval in milliseconds to evaluate predicate.
155   * @param predicate the predicate to evaluate.
156   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
157   *         wait is interrupted otherwise <code>-1</code> when times out
158   */
159  public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
160      Predicate<E> predicate) throws E {
161    return waitFor(conf, timeout, interval, true, predicate);
162  }
163
164  /**
165   * Waits up to the duration equal to the specified timeout multiplied by the
166   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
167   * <code>true</code>, failing the test if the timeout is reached, the Predicate is still
168   * <code>false</code> and failIfTimeout is set as <code>true</code>.
169   * <p/>
170   * @param conf the configuration
171   * @param timeout the timeout in milliseconds to wait for the predicate.
172   * @param interval the interval in milliseconds to evaluate predicate.
173   * @param failIfTimeout indicates if should fail current test case when times out.
174   * @param predicate the predicate to evaluate.
175   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
176   *         wait is interrupted otherwise <code>-1</code> when times out
177   */
178  public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
179      boolean failIfTimeout, Predicate<E> predicate) throws E {
180    long started = System.currentTimeMillis();
181    long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout);
182    long mustEnd = started + adjustedTimeout;
183    long remainderWait = 0;
184    long sleepInterval = 0;
185    Boolean eval = false;
186    Boolean interrupted = false;
187
188    try {
189      LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])",
190        adjustedTimeout, getWaitForRatio(conf)));
191      while (!(eval = predicate.evaluate())
192              && (remainderWait = mustEnd - System.currentTimeMillis()) > 0) {
193        try {
194          // handle tail case when remainder wait is less than one interval
195          sleepInterval = (remainderWait > interval) ? interval : remainderWait;
196          Thread.sleep(sleepInterval);
197        } catch (InterruptedException e) {
198          eval = predicate.evaluate();
199          interrupted = true;
200          break;
201        }
202      }
203      if (!eval) {
204        if (interrupted) {
205          LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec",
206            System.currentTimeMillis() - started));
207        } else if (failIfTimeout) {
208          String msg = getExplanation(predicate);
209          fail(MessageFormat
210              .format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
211        } else {
212          String msg = getExplanation(predicate);
213          LOG.warn(
214              MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
215        }
216      }
217      return (eval || interrupted) ? (System.currentTimeMillis() - started) : -1;
218    } catch (Exception ex) {
219      throw new RuntimeException(ex);
220    }
221  }
222
223  public static String getExplanation(Predicate<?> explain) {
224    if (explain instanceof ExplainingPredicate) {
225      try {
226        return " " + ((ExplainingPredicate<?>) explain).explainFailure();
227      } catch (Exception e) {
228        LOG.error("Failed to get explanation, ", e);
229        return e.getMessage();
230      }
231    } else {
232      return "";
233    }
234  }
235
236}