001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.fail; 021 022import java.text.MessageFormat; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * A class that provides a standard waitFor pattern See details at 031 * https://issues.apache.org/jira/browse/HBASE-7384 032 */ 033@InterfaceAudience.Private 034public final class Waiter { 035 private static final Logger LOG = LoggerFactory.getLogger(Waiter.class); 036 037 /** 038 * System property name whose value is a scale factor to increase time out values dynamically used 039 * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)}, 040 * {@link #waitFor(Configuration, long, long, Predicate)}, and 041 * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method 042 * <p/> 043 * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout 044 */ 045 public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio"; 046 047 private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1; 048 049 private static float waitForRatio = -1; 050 051 private Waiter() { 052 } 053 054 /** 055 * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)}, 056 * {@link #waitFor(Configuration, long, Predicate)}, 057 * {@link #waitFor(Configuration, long, long, Predicate)} and 058 * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class 059 * <p/> 060 * This is useful to dynamically adjust max time out values when same test cases run in different 061 * test machine settings without recompiling & re-deploying code. 062 * <p/> 063 * The value is obtained from the Java System property or configuration setting 064 * <code>hbase.test.wait.for.ratio</code> which defaults to <code>1</code>. 065 * @param conf the configuration 066 * @return the 'wait for ratio' for the current test run. 067 */ 068 public static float getWaitForRatio(Configuration conf) { 069 if (waitForRatio < 0) { 070 // System property takes precedence over configuration setting 071 if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) { 072 waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO)); 073 } else { 074 waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT); 075 } 076 } 077 return waitForRatio; 078 } 079 080 /** 081 * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and 082 * {@link Waiter#waitFor(Configuration, long, Predicate)} and 083 * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate)} methods. 084 */ 085 @InterfaceAudience.Private 086 public interface Predicate<E extends Exception> { 087 /** 088 * Perform a predicate evaluation. 089 * @return the boolean result of the evaluation. 090 * @throws E thrown if the predicate evaluation could not evaluate. 091 */ 092 boolean evaluate() throws E; 093 } 094 095 /** 096 * A mixin interface, can be used with {@link Waiter} to explain failed state. 097 */ 098 @InterfaceAudience.Private 099 public interface ExplainingPredicate<E extends Exception> extends Predicate<E> { 100 /** 101 * Perform a predicate evaluation. 102 * @return explanation of failed state 103 */ 104 String explainFailure() throws E; 105 } 106 107 /** 108 * Makes the current thread sleep for the duration equal to the specified time in milliseconds 109 * multiplied by the {@link #getWaitForRatio(Configuration)}. 110 * @param conf the configuration 111 * @param time the number of milliseconds to sleep. 112 */ 113 public static void sleep(Configuration conf, long time) { 114 try { 115 Thread.sleep((long) (getWaitForRatio(conf) * time)); 116 } catch (InterruptedException ex) { 117 LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString())); 118 } 119 } 120 121 /** 122 * Waits up to the duration equal to the specified timeout multiplied by the 123 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become 124 * <code>true</code>, failing the test if the timeout is reached and the Predicate is still 125 * <code>false</code>. 126 * <p/> 127 * @param conf the configuration 128 * @param timeout the timeout in milliseconds to wait for the predicate. 129 * @param predicate the predicate to evaluate. 130 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or 131 * wait is interrupted otherwise <code>-1</code> when times out 132 */ 133 public static <E extends Exception> long waitFor(Configuration conf, long timeout, 134 Predicate<E> predicate) { 135 return waitFor(conf, timeout, 100, true, predicate); 136 } 137 138 /** 139 * Waits up to the duration equal to the specified timeout multiplied by the 140 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become 141 * <code>true</code>, failing the test if the timeout is reached and the Predicate is still 142 * <code>false</code>. 143 * <p/> 144 * @param conf the configuration 145 * @param timeout the max timeout in milliseconds to wait for the predicate. 146 * @param interval the interval in milliseconds to evaluate predicate. 147 * @param predicate the predicate to evaluate. 148 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or 149 * wait is interrupted otherwise <code>-1</code> when times out 150 */ 151 public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval, 152 Predicate<E> predicate) { 153 return waitFor(conf, timeout, interval, true, predicate); 154 } 155 156 /** 157 * Waits up to the duration equal to the specified timeout multiplied by the 158 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become 159 * <code>true</code>, failing the test if the timeout is reached, the Predicate is still 160 * <code>false</code> and failIfTimeout is set as <code>true</code>. 161 * <p/> 162 * @param conf the configuration 163 * @param timeout the timeout in milliseconds to wait for the predicate. 164 * @param interval the interval in milliseconds to evaluate predicate. 165 * @param failIfTimeout indicates if should fail current test case when times out. 166 * @param predicate the predicate to evaluate. 167 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or 168 * wait is interrupted otherwise <code>-1</code> when times out 169 */ 170 public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval, 171 boolean failIfTimeout, Predicate<E> predicate) { 172 long started = EnvironmentEdgeManager.currentTime(); 173 long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout); 174 long mustEnd = started + adjustedTimeout; 175 long remainderWait; 176 long sleepInterval; 177 boolean eval; 178 boolean interrupted = false; 179 180 try { 181 LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])", 182 adjustedTimeout, getWaitForRatio(conf))); 183 while ( 184 !(eval = predicate.evaluate()) 185 && (remainderWait = mustEnd - EnvironmentEdgeManager.currentTime()) > 0 186 ) { 187 try { 188 // handle tail case when remainder wait is less than one interval 189 sleepInterval = Math.min(remainderWait, interval); 190 Thread.sleep(sleepInterval); 191 } catch (InterruptedException e) { 192 eval = predicate.evaluate(); 193 interrupted = true; 194 break; 195 } 196 } 197 if (!eval) { 198 if (interrupted) { 199 LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec", 200 EnvironmentEdgeManager.currentTime() - started)); 201 } else if (failIfTimeout) { 202 String msg = getExplanation(predicate); 203 fail(MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg); 204 } else { 205 String msg = getExplanation(predicate); 206 LOG.warn( 207 MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg); 208 } 209 } 210 return (eval || interrupted) ? (EnvironmentEdgeManager.currentTime() - started) : -1; 211 } catch (Exception ex) { 212 throw new RuntimeException(ex); 213 } 214 } 215 216 public static String getExplanation(Predicate<?> explain) { 217 if (explain instanceof ExplainingPredicate) { 218 try { 219 return " " + ((ExplainingPredicate<?>) explain).explainFailure(); 220 } catch (Exception e) { 221 LOG.error("Failed to get explanation, ", e); 222 return e.getMessage(); 223 } 224 } else { 225 return ""; 226 } 227 } 228}