001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase; 020 021import static org.junit.Assert.fail; 022 023import java.text.MessageFormat; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * A class that provides a standard waitFor pattern 032 * See details at https://issues.apache.org/jira/browse/HBASE-7384 033 */ 034@InterfaceAudience.Private 035public final class Waiter { 036 private static final Logger LOG = LoggerFactory.getLogger(Waiter.class); 037 038 /** 039 * System property name whose value is a scale factor to increase time out values dynamically used 040 * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)}, 041 * {@link #waitFor(Configuration, long, long, Predicate)}, and 042 * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method 043 * <p/> 044 * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout 045 */ 046 public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio"; 047 048 private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1; 049 050 private static float waitForRatio = -1; 051 052 private Waiter() { 053 } 054 055 /** 056 * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)}, 057 * {@link #waitFor(Configuration, long, Predicate)}, 058 * {@link #waitFor(Configuration, long, long, Predicate)} and 059 * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class 060 * <p/> 061 * This is useful to dynamically adjust max time out values when same test cases run in different 062 * test machine settings without recompiling & re-deploying code. 063 * <p/> 064 * The value is obtained from the Java System property or configuration setting 065 * <code>hbase.test.wait.for.ratio</code> which defaults to <code>1</code>. 066 * @param conf the configuration 067 * @return the 'wait for ratio' for the current test run. 068 */ 069 public static float getWaitForRatio(Configuration conf) { 070 if (waitForRatio < 0) { 071 // System property takes precedence over configuration setting 072 if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) { 073 waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO)); 074 } else { 075 waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT); 076 } 077 } 078 return waitForRatio; 079 } 080 081 /** 082 * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and 083 * {@link Waiter#waitFor(Configuration, long, Predicate)} and 084 * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate)} methods. 085 */ 086 @InterfaceAudience.Private 087 public interface Predicate<E extends Exception> { 088 /** 089 * Perform a predicate evaluation. 090 * @return the boolean result of the evaluation. 091 * @throws E thrown if the predicate evaluation could not evaluate. 092 */ 093 boolean evaluate() throws E; 094 } 095 096 /** 097 * A mixin interface, can be used with {@link Waiter} to explain failed state. 098 */ 099 @InterfaceAudience.Private 100 public interface ExplainingPredicate<E extends Exception> extends Predicate<E> { 101 /** 102 * Perform a predicate evaluation. 103 * 104 * @return explanation of failed state 105 */ 106 String explainFailure() throws E; 107 } 108 109 /** 110 * Makes the current thread sleep for the duration equal to the specified time in milliseconds 111 * multiplied by the {@link #getWaitForRatio(Configuration)}. 112 * @param conf the configuration 113 * @param time the number of milliseconds to sleep. 114 */ 115 public static void sleep(Configuration conf, long time) { 116 try { 117 Thread.sleep((long) (getWaitForRatio(conf) * time)); 118 } catch (InterruptedException ex) { 119 LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString())); 120 } 121 } 122 123 /** 124 * Waits up to the duration equal to the specified timeout multiplied by the 125 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become 126 * <code>true</code>, failing the test if the timeout is reached and the Predicate is still 127 * <code>false</code>. 128 * <p/> 129 * @param conf the configuration 130 * @param timeout the timeout in milliseconds to wait for the predicate. 131 * @param predicate the predicate to evaluate. 132 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or 133 * wait is interrupted otherwise <code>-1</code> when times out 134 */ 135 public static <E extends Exception> long waitFor(Configuration conf, long timeout, 136 Predicate<E> predicate) { 137 return waitFor(conf, timeout, 100, true, predicate); 138 } 139 140 /** 141 * Waits up to the duration equal to the specified timeout multiplied by the 142 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become 143 * <code>true</code>, failing the test if the timeout is reached and the Predicate is still 144 * <code>false</code>. 145 * <p/> 146 * @param conf the configuration 147 * @param timeout the max timeout in milliseconds to wait for the predicate. 148 * @param interval the interval in milliseconds to evaluate predicate. 149 * @param predicate the predicate to evaluate. 150 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or 151 * wait is interrupted otherwise <code>-1</code> when times out 152 */ 153 public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval, 154 Predicate<E> predicate) { 155 return waitFor(conf, timeout, interval, true, predicate); 156 } 157 158 /** 159 * Waits up to the duration equal to the specified timeout multiplied by the 160 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become 161 * <code>true</code>, failing the test if the timeout is reached, the Predicate is still 162 * <code>false</code> and failIfTimeout is set as <code>true</code>. 163 * <p/> 164 * @param conf the configuration 165 * @param timeout the timeout in milliseconds to wait for the predicate. 166 * @param interval the interval in milliseconds to evaluate predicate. 167 * @param failIfTimeout indicates if should fail current test case when times out. 168 * @param predicate the predicate to evaluate. 169 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or 170 * wait is interrupted otherwise <code>-1</code> when times out 171 */ 172 public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval, 173 boolean failIfTimeout, Predicate<E> predicate) { 174 long started = System.currentTimeMillis(); 175 long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout); 176 long mustEnd = started + adjustedTimeout; 177 long remainderWait; 178 long sleepInterval; 179 boolean eval; 180 boolean interrupted = false; 181 182 try { 183 LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])", 184 adjustedTimeout, getWaitForRatio(conf))); 185 while (!(eval = predicate.evaluate()) 186 && (remainderWait = mustEnd - System.currentTimeMillis()) > 0) { 187 try { 188 // handle tail case when remainder wait is less than one interval 189 sleepInterval = Math.min(remainderWait, interval); 190 Thread.sleep(sleepInterval); 191 } catch (InterruptedException e) { 192 eval = predicate.evaluate(); 193 interrupted = true; 194 break; 195 } 196 } 197 if (!eval) { 198 if (interrupted) { 199 LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec", 200 System.currentTimeMillis() - started)); 201 } else if (failIfTimeout) { 202 String msg = getExplanation(predicate); 203 fail(MessageFormat 204 .format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg); 205 } else { 206 String msg = getExplanation(predicate); 207 LOG.warn( 208 MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg); 209 } 210 } 211 return (eval || interrupted) ? (System.currentTimeMillis() - started) : -1; 212 } catch (Exception ex) { 213 throw new RuntimeException(ex); 214 } 215 } 216 217 public static String getExplanation(Predicate<?> explain) { 218 if (explain instanceof ExplainingPredicate) { 219 try { 220 return " " + ((ExplainingPredicate<?>) explain).explainFailure(); 221 } catch (Exception e) { 222 LOG.error("Failed to get explanation, ", e); 223 return e.getMessage(); 224 } 225 } else { 226 return ""; 227 } 228 } 229}