001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import static org.junit.Assert.fail;
022
023import java.text.MessageFormat;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * A class that provides a standard waitFor pattern
033 * See details at https://issues.apache.org/jira/browse/HBASE-7384
034 */
035@InterfaceAudience.Private
036public final class Waiter {
037  private static final Logger LOG = LoggerFactory.getLogger(Waiter.class);
038
039  /**
040   * System property name whose value is a scale factor to increase time out values dynamically used
041   * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)},
042   * {@link #waitFor(Configuration, long, long, Predicate)}, and
043   * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method
044   * <p/>
045   * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout
046   */
047  public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio";
048
049  private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1;
050
051  private static float waitForRatio = -1;
052
053  private Waiter() {
054  }
055
056  /**
057   * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)},
058   * {@link #waitFor(Configuration, long, Predicate)},
059   * {@link #waitFor(Configuration, long, long, Predicate)} and
060   * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class
061   * <p/>
062   * This is useful to dynamically adjust max time out values when same test cases run in different
063   * test machine settings without recompiling & re-deploying code.
064   * <p/>
065   * The value is obtained from the Java System property or configuration setting
066   * <code>hbase.test.wait.for.ratio</code> which defaults to <code>1</code>.
067   * @param conf the configuration
068   * @return the 'wait for ratio' for the current test run.
069   */
070  public static float getWaitForRatio(Configuration conf) {
071    if (waitForRatio < 0) {
072      // System property takes precedence over configuration setting
073      if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) {
074        waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO));
075      } else {
076        waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT);
077      }
078    }
079    return waitForRatio;
080  }
081
082  /**
083   * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and
084   * {@link Waiter#waitFor(Configuration, long, Predicate)} and
085   * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate)} methods.
086   */
087  @InterfaceAudience.Private
088  public interface Predicate<E extends Exception> {
089    /**
090     * Perform a predicate evaluation.
091     * @return the boolean result of the evaluation.
092     * @throws E thrown if the predicate evaluation could not evaluate.
093     */
094    boolean evaluate() throws E;
095  }
096
097  /**
098   * A mixin interface, can be used with {@link Waiter} to explain failed state.
099   */
100  @InterfaceAudience.Private
101  public interface ExplainingPredicate<E extends Exception> extends Predicate<E> {
102    /**
103     * Perform a predicate evaluation.
104     *
105     * @return explanation of failed state
106     */
107    String explainFailure() throws E;
108  }
109
110  /**
111   * Makes the current thread sleep for the duration equal to the specified time in milliseconds
112   * multiplied by the {@link #getWaitForRatio(Configuration)}.
113   * @param conf the configuration
114   * @param time the number of milliseconds to sleep.
115   */
116  public static void sleep(Configuration conf, long time) {
117    try {
118      Thread.sleep((long) (getWaitForRatio(conf) * time));
119    } catch (InterruptedException ex) {
120      LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString()));
121    }
122  }
123
124  /**
125   * Waits up to the duration equal to the specified timeout multiplied by the
126   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
127   * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
128   * <code>false</code>.
129   * <p/>
130   * @param conf the configuration
131   * @param timeout the timeout in milliseconds to wait for the predicate.
132   * @param predicate the predicate to evaluate.
133   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
134   *         wait is interrupted otherwise <code>-1</code> when times out
135   */
136  public static <E extends Exception> long waitFor(Configuration conf, long timeout,
137      Predicate<E> predicate) {
138    return waitFor(conf, timeout, 100, true, predicate);
139  }
140
141  /**
142   * Waits up to the duration equal to the specified timeout multiplied by the
143   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
144   * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
145   * <code>false</code>.
146   * <p/>
147   * @param conf the configuration
148   * @param timeout the max timeout in milliseconds to wait for the predicate.
149   * @param interval the interval in milliseconds to evaluate predicate.
150   * @param predicate the predicate to evaluate.
151   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
152   *         wait is interrupted otherwise <code>-1</code> when times out
153   */
154  public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
155      Predicate<E> predicate) {
156    return waitFor(conf, timeout, interval, true, predicate);
157  }
158
159  /**
160   * Waits up to the duration equal to the specified timeout multiplied by the
161   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
162   * <code>true</code>, failing the test if the timeout is reached, the Predicate is still
163   * <code>false</code> and failIfTimeout is set as <code>true</code>.
164   * <p/>
165   * @param conf the configuration
166   * @param timeout the timeout in milliseconds to wait for the predicate.
167   * @param interval the interval in milliseconds to evaluate predicate.
168   * @param failIfTimeout indicates if should fail current test case when times out.
169   * @param predicate the predicate to evaluate.
170   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
171   *         wait is interrupted otherwise <code>-1</code> when times out
172   */
173  public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
174      boolean failIfTimeout, Predicate<E> predicate) {
175    long started = EnvironmentEdgeManager.currentTime();
176    long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout);
177    long mustEnd = started + adjustedTimeout;
178    long remainderWait;
179    long sleepInterval;
180    boolean eval;
181    boolean interrupted = false;
182
183    try {
184      LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])",
185        adjustedTimeout, getWaitForRatio(conf)));
186      while (!(eval = predicate.evaluate())
187              && (remainderWait = mustEnd - EnvironmentEdgeManager.currentTime()) > 0) {
188        try {
189          // handle tail case when remainder wait is less than one interval
190          sleepInterval = Math.min(remainderWait, interval);
191          Thread.sleep(sleepInterval);
192        } catch (InterruptedException e) {
193          eval = predicate.evaluate();
194          interrupted = true;
195          break;
196        }
197      }
198      if (!eval) {
199        if (interrupted) {
200          LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec",
201            EnvironmentEdgeManager.currentTime() - started));
202        } else if (failIfTimeout) {
203          String msg = getExplanation(predicate);
204          fail(MessageFormat
205              .format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
206        } else {
207          String msg = getExplanation(predicate);
208          LOG.warn(
209              MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
210        }
211      }
212      return (eval || interrupted) ? (EnvironmentEdgeManager.currentTime() - started) : -1;
213    } catch (Exception ex) {
214      throw new RuntimeException(ex);
215    }
216  }
217
218  public static String getExplanation(Predicate<?> explain) {
219    if (explain instanceof ExplainingPredicate) {
220      try {
221        return " " + ((ExplainingPredicate<?>) explain).explainFailure();
222      } catch (Exception e) {
223        LOG.error("Failed to get explanation, ", e);
224        return e.getMessage();
225      }
226    } else {
227      return "";
228    }
229  }
230}