001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.fail;
021
022import java.text.MessageFormat;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * A class that provides a standard waitFor pattern See details at
031 * https://issues.apache.org/jira/browse/HBASE-7384
032 */
033@InterfaceAudience.Private
034public final class Waiter {
035  private static final Logger LOG = LoggerFactory.getLogger(Waiter.class);
036
037  /**
038   * System property name whose value is a scale factor to increase time out values dynamically used
039   * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)},
040   * {@link #waitFor(Configuration, long, long, Predicate)}, and
041   * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method
042   * <p/>
043   * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout
044   */
045  public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio";
046
047  private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1;
048
049  private static float waitForRatio = -1;
050
051  private Waiter() {
052  }
053
054  /**
055   * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)},
056   * {@link #waitFor(Configuration, long, Predicate)},
057   * {@link #waitFor(Configuration, long, long, Predicate)} and
058   * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class
059   * <p/>
060   * This is useful to dynamically adjust max time out values when same test cases run in different
061   * test machine settings without recompiling & re-deploying code.
062   * <p/>
063   * The value is obtained from the Java System property or configuration setting
064   * <code>hbase.test.wait.for.ratio</code> which defaults to <code>1</code>.
065   * @param conf the configuration
066   * @return the 'wait for ratio' for the current test run.
067   */
068  public static float getWaitForRatio(Configuration conf) {
069    if (waitForRatio < 0) {
070      // System property takes precedence over configuration setting
071      if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) {
072        waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO));
073      } else {
074        waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT);
075      }
076    }
077    return waitForRatio;
078  }
079
080  /**
081   * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and
082   * {@link Waiter#waitFor(Configuration, long, Predicate)} and
083   * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate)} methods.
084   */
085  @InterfaceAudience.Private
086  public interface Predicate<E extends Exception> {
087    /**
088     * Perform a predicate evaluation.
089     * @return the boolean result of the evaluation.
090     * @throws E thrown if the predicate evaluation could not evaluate.
091     */
092    boolean evaluate() throws E;
093  }
094
095  /**
096   * A mixin interface, can be used with {@link Waiter} to explain failed state.
097   */
098  @InterfaceAudience.Private
099  public interface ExplainingPredicate<E extends Exception> extends Predicate<E> {
100    /**
101     * Perform a predicate evaluation.
102     * @return explanation of failed state
103     */
104    String explainFailure() throws E;
105  }
106
107  /**
108   * Makes the current thread sleep for the duration equal to the specified time in milliseconds
109   * multiplied by the {@link #getWaitForRatio(Configuration)}.
110   * @param conf the configuration
111   * @param time the number of milliseconds to sleep.
112   */
113  public static void sleep(Configuration conf, long time) {
114    try {
115      Thread.sleep((long) (getWaitForRatio(conf) * time));
116    } catch (InterruptedException ex) {
117      LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString()));
118    }
119  }
120
121  /**
122   * Waits up to the duration equal to the specified timeout multiplied by the
123   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
124   * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
125   * <code>false</code>.
126   * <p/>
127   * @param conf      the configuration
128   * @param timeout   the timeout in milliseconds to wait for the predicate.
129   * @param predicate the predicate to evaluate.
130   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
131   *         wait is interrupted otherwise <code>-1</code> when times out
132   */
133  public static <E extends Exception> long waitFor(Configuration conf, long timeout,
134    Predicate<E> predicate) {
135    return waitFor(conf, timeout, 100, true, predicate);
136  }
137
138  /**
139   * Waits up to the duration equal to the specified timeout multiplied by the
140   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
141   * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
142   * <code>false</code>.
143   * <p/>
144   * @param conf      the configuration
145   * @param timeout   the max timeout in milliseconds to wait for the predicate.
146   * @param interval  the interval in milliseconds to evaluate predicate.
147   * @param predicate the predicate to evaluate.
148   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
149   *         wait is interrupted otherwise <code>-1</code> when times out
150   */
151  public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
152    Predicate<E> predicate) {
153    return waitFor(conf, timeout, interval, true, predicate);
154  }
155
156  /**
157   * Waits up to the duration equal to the specified timeout multiplied by the
158   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
159   * <code>true</code>, failing the test if the timeout is reached, the Predicate is still
160   * <code>false</code> and failIfTimeout is set as <code>true</code>.
161   * <p/>
162   * @param conf          the configuration
163   * @param timeout       the timeout in milliseconds to wait for the predicate.
164   * @param interval      the interval in milliseconds to evaluate predicate.
165   * @param failIfTimeout indicates if should fail current test case when times out.
166   * @param predicate     the predicate to evaluate.
167   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
168   *         wait is interrupted otherwise <code>-1</code> when times out
169   */
170  public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
171    boolean failIfTimeout, Predicate<E> predicate) {
172    long started = EnvironmentEdgeManager.currentTime();
173    long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout);
174    long mustEnd = started + adjustedTimeout;
175    long remainderWait;
176    long sleepInterval;
177    boolean eval;
178    boolean interrupted = false;
179
180    try {
181      LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])",
182        adjustedTimeout, getWaitForRatio(conf)));
183      while (
184        !(eval = predicate.evaluate())
185          && (remainderWait = mustEnd - EnvironmentEdgeManager.currentTime()) > 0
186      ) {
187        try {
188          // handle tail case when remainder wait is less than one interval
189          sleepInterval = Math.min(remainderWait, interval);
190          Thread.sleep(sleepInterval);
191        } catch (InterruptedException e) {
192          eval = predicate.evaluate();
193          interrupted = true;
194          break;
195        }
196      }
197      if (!eval) {
198        if (interrupted) {
199          LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec",
200            EnvironmentEdgeManager.currentTime() - started));
201        } else if (failIfTimeout) {
202          String msg = getExplanation(predicate);
203          fail(MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
204        } else {
205          String msg = getExplanation(predicate);
206          LOG.warn(
207            MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
208        }
209      }
210      return (eval || interrupted) ? (EnvironmentEdgeManager.currentTime() - started) : -1;
211    } catch (Exception ex) {
212      throw new RuntimeException(ex);
213    }
214  }
215
216  public static String getExplanation(Predicate<?> explain) {
217    if (explain instanceof ExplainingPredicate) {
218      try {
219        return " " + ((ExplainingPredicate<?>) explain).explainFailure();
220      } catch (Exception e) {
221        LOG.error("Failed to get explanation, ", e);
222        return e.getMessage();
223      }
224    } else {
225      return "";
226    }
227  }
228}