001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import static org.junit.Assert.fail;
022
023import java.text.MessageFormat;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * A class that provides a standard waitFor pattern
032 * See details at https://issues.apache.org/jira/browse/HBASE-7384
033 */
034@InterfaceAudience.Private
035public final class Waiter {
036  private static final Logger LOG = LoggerFactory.getLogger(Waiter.class);
037
038  /**
039   * System property name whose value is a scale factor to increase time out values dynamically used
040   * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)},
041   * {@link #waitFor(Configuration, long, long, Predicate)}, and
042   * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method
043   * <p/>
044   * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout
045   */
046  public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio";
047
048  private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1;
049
050  private static float waitForRatio = -1;
051
052  private Waiter() {
053  }
054
055  /**
056   * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)},
057   * {@link #waitFor(Configuration, long, Predicate)},
058   * {@link #waitFor(Configuration, long, long, Predicate)} and
059   * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class
060   * <p/>
061   * This is useful to dynamically adjust max time out values when same test cases run in different
062   * test machine settings without recompiling & re-deploying code.
063   * <p/>
064   * The value is obtained from the Java System property or configuration setting
065   * <code>hbase.test.wait.for.ratio</code> which defaults to <code>1</code>.
066   * @param conf the configuration
067   * @return the 'wait for ratio' for the current test run.
068   */
069  public static float getWaitForRatio(Configuration conf) {
070    if (waitForRatio < 0) {
071      // System property takes precedence over configuration setting
072      if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) {
073        waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO));
074      } else {
075        waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT);
076      }
077    }
078    return waitForRatio;
079  }
080
081  /**
082   * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and
083   * {@link Waiter#waitFor(Configuration, long, Predicate)} and
084   * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate)} methods.
085   */
086  @InterfaceAudience.Private
087  public interface Predicate<E extends Exception> {
088    /**
089     * Perform a predicate evaluation.
090     * @return the boolean result of the evaluation.
091     * @throws E thrown if the predicate evaluation could not evaluate.
092     */
093    boolean evaluate() throws E;
094  }
095
096  /**
097   * A mixin interface, can be used with {@link Waiter} to explain failed state.
098   */
099  @InterfaceAudience.Private
100  public interface ExplainingPredicate<E extends Exception> extends Predicate<E> {
101    /**
102     * Perform a predicate evaluation.
103     *
104     * @return explanation of failed state
105     */
106    String explainFailure() throws E;
107  }
108
109  /**
110   * Makes the current thread sleep for the duration equal to the specified time in milliseconds
111   * multiplied by the {@link #getWaitForRatio(Configuration)}.
112   * @param conf the configuration
113   * @param time the number of milliseconds to sleep.
114   */
115  public static void sleep(Configuration conf, long time) {
116    try {
117      Thread.sleep((long) (getWaitForRatio(conf) * time));
118    } catch (InterruptedException ex) {
119      LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString()));
120    }
121  }
122
123  /**
124   * Waits up to the duration equal to the specified timeout multiplied by the
125   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
126   * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
127   * <code>false</code>.
128   * <p/>
129   * @param conf the configuration
130   * @param timeout the timeout in milliseconds to wait for the predicate.
131   * @param predicate the predicate to evaluate.
132   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
133   *         wait is interrupted otherwise <code>-1</code> when times out
134   */
135  public static <E extends Exception> long waitFor(Configuration conf, long timeout,
136      Predicate<E> predicate) {
137    return waitFor(conf, timeout, 100, true, predicate);
138  }
139
140  /**
141   * Waits up to the duration equal to the specified timeout multiplied by the
142   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
143   * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
144   * <code>false</code>.
145   * <p/>
146   * @param conf the configuration
147   * @param timeout the max timeout in milliseconds to wait for the predicate.
148   * @param interval the interval in milliseconds to evaluate predicate.
149   * @param predicate the predicate to evaluate.
150   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
151   *         wait is interrupted otherwise <code>-1</code> when times out
152   */
153  public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
154      Predicate<E> predicate) {
155    return waitFor(conf, timeout, interval, true, predicate);
156  }
157
158  /**
159   * Waits up to the duration equal to the specified timeout multiplied by the
160   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
161   * <code>true</code>, failing the test if the timeout is reached, the Predicate is still
162   * <code>false</code> and failIfTimeout is set as <code>true</code>.
163   * <p/>
164   * @param conf the configuration
165   * @param timeout the timeout in milliseconds to wait for the predicate.
166   * @param interval the interval in milliseconds to evaluate predicate.
167   * @param failIfTimeout indicates if should fail current test case when times out.
168   * @param predicate the predicate to evaluate.
169   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
170   *         wait is interrupted otherwise <code>-1</code> when times out
171   */
172  public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
173      boolean failIfTimeout, Predicate<E> predicate) {
174    long started = System.currentTimeMillis();
175    long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout);
176    long mustEnd = started + adjustedTimeout;
177    long remainderWait;
178    long sleepInterval;
179    boolean eval;
180    boolean interrupted = false;
181
182    try {
183      LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])",
184        adjustedTimeout, getWaitForRatio(conf)));
185      while (!(eval = predicate.evaluate())
186              && (remainderWait = mustEnd - System.currentTimeMillis()) > 0) {
187        try {
188          // handle tail case when remainder wait is less than one interval
189          sleepInterval = Math.min(remainderWait, interval);
190          Thread.sleep(sleepInterval);
191        } catch (InterruptedException e) {
192          eval = predicate.evaluate();
193          interrupted = true;
194          break;
195        }
196      }
197      if (!eval) {
198        if (interrupted) {
199          LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec",
200            System.currentTimeMillis() - started));
201        } else if (failIfTimeout) {
202          String msg = getExplanation(predicate);
203          fail(MessageFormat
204              .format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
205        } else {
206          String msg = getExplanation(predicate);
207          LOG.warn(
208              MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
209        }
210      }
211      return (eval || interrupted) ? (System.currentTimeMillis() - started) : -1;
212    } catch (Exception ex) {
213      throw new RuntimeException(ex);
214    }
215  }
216
217  public static String getExplanation(Predicate<?> explain) {
218    if (explain instanceof ExplainingPredicate) {
219      try {
220        return " " + ((ExplainingPredicate<?>) explain).explainFailure();
221      } catch (Exception e) {
222        LOG.error("Failed to get explanation, ", e);
223        return e.getMessage();
224      }
225    } else {
226      return "";
227    }
228  }
229}