001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; 021 022import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 023 024import java.io.IOException; 025import java.util.Map.Entry; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028 029import org.apache.commons.lang3.mutable.MutableBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 037import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; 038import org.apache.hadoop.hbase.ipc.CallTimeoutException; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.ipc.RemoteException; 041 042/** 043 * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail 044 * feature. 045 * <p> 046 * The motivation is as follows : In case where a large number of clients try and talk to a 047 * particular region server in hbase, if the region server goes down due to network problems, we 048 * might end up in a scenario where the clients would go into a state where they all start to retry. 049 * This behavior will set off many of the threads in pretty much the same path and they all would be 050 * sleeping giving rise to a state where the client either needs to create more threads to send new 051 * requests to other hbase machines or block because the client cannot create anymore threads. 052 * <p> 053 * In most cases the clients might prefer to have a bound on the number of threads that are created 054 * in order to send requests to hbase. This would mostly result in the client thread starvation. 055 * <p> 056 * To circumvent this problem, the approach that is being taken here under is to let 1 of the many 057 * threads who are trying to contact the regionserver with connection problems and let the other 058 * threads get a {@link PreemptiveFastFailException} so that they can move on and take other 059 * requests. 060 * <p> 061 * This would give the client more flexibility on the kind of action he would want to take in cases 062 * where the regionserver is down. He can either discard the requests and send a nack upstream 063 * faster or have an application level retry or buffer the requests up so as to send them down to 064 * hbase later. 065 */ 066@InterfaceAudience.Private 067class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { 068 069 private static final Logger LOG = LoggerFactory 070 .getLogger(PreemptiveFastFailInterceptor.class); 071 072 // amount of time to wait before we consider a server to be in fast fail 073 // mode 074 protected final long fastFailThresholdMilliSec; 075 076 // Keeps track of failures when we cannot talk to a server. Helps in 077 // fast failing clients if the server is down for a long time. 078 protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap = new ConcurrentHashMap<>(); 079 080 // We populate repeatedFailuresMap every time there is a failure. So, to 081 // keep it from growing unbounded, we garbage collect the failure information 082 // every cleanupInterval. 083 protected final long failureMapCleanupIntervalMilliSec; 084 085 protected volatile long lastFailureMapCleanupTimeMilliSec; 086 087 // clear failure Info. Used to clean out all entries. 088 // A safety valve, in case the client does not exit the 089 // fast fail mode for any reason. 090 private long fastFailClearingTimeMilliSec; 091 092 private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = new ThreadLocal<>(); 093 094 public PreemptiveFastFailInterceptor(Configuration conf) { 095 this.fastFailThresholdMilliSec = conf.getLong( 096 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 097 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT); 098 this.failureMapCleanupIntervalMilliSec = conf.getLong( 099 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, 100 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT); 101 lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime(); 102 } 103 104 public void intercept(FastFailInterceptorContext context) 105 throws PreemptiveFastFailException { 106 context.setFailureInfo(repeatedFailuresMap.get(context.getServer())); 107 if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) { 108 // In Fast-fail mode, all but one thread will fast fail. Check 109 // if we are that one chosen thread. 110 context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context 111 .getFailureInfo())); 112 if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry 113 LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : " 114 + context.getTries()); 115 throw new PreemptiveFastFailException( 116 context.getFailureInfo().numConsecutiveFailures.get(), 117 context.getFailureInfo().timeOfFirstFailureMilliSec, 118 context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(), 119 context.getGuaranteedClientSideOnly().isTrue()); 120 } 121 } 122 context.setDidTry(true); 123 } 124 125 public void handleFailure(FastFailInterceptorContext context, 126 Throwable t) throws IOException { 127 handleThrowable(t, context.getServer(), 128 context.getCouldNotCommunicateWithServer(), 129 context.getGuaranteedClientSideOnly()); 130 } 131 132 public void updateFailureInfo(FastFailInterceptorContext context) { 133 updateFailureInfoForServer(context.getServer(), context.getFailureInfo(), 134 context.didTry(), context.getCouldNotCommunicateWithServer() 135 .booleanValue(), context.isRetryDespiteFastFailMode()); 136 } 137 138 /** 139 * Handles failures encountered when communicating with a server. 140 * 141 * Updates the FailureInfo in repeatedFailuresMap to reflect the failure. 142 * Throws RepeatedConnectException if the client is in Fast fail mode. 143 * 144 * @param serverName 145 * @param t 146 * - the throwable to be handled. 147 * @throws PreemptiveFastFailException 148 */ 149 @VisibleForTesting 150 protected void handleFailureToServer(ServerName serverName, Throwable t) { 151 if (serverName == null || t == null) { 152 return; 153 } 154 long currentTime = EnvironmentEdgeManager.currentTime(); 155 FailureInfo fInfo = 156 computeIfAbsent(repeatedFailuresMap, serverName, () -> new FailureInfo(currentTime)); 157 fInfo.timeOfLatestAttemptMilliSec = currentTime; 158 fInfo.numConsecutiveFailures.incrementAndGet(); 159 } 160 161 public void handleThrowable(Throwable t1, ServerName serverName, 162 MutableBoolean couldNotCommunicateWithServer, 163 MutableBoolean guaranteedClientSideOnly) throws IOException { 164 Throwable t2 = ClientExceptionsUtil.translatePFFE(t1); 165 boolean isLocalException = !(t2 instanceof RemoteException); 166 167 if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2))) { 168 couldNotCommunicateWithServer.setValue(true); 169 guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException)); 170 handleFailureToServer(serverName, t2); 171 } 172 } 173 174 /** 175 * Occasionally cleans up unused information in repeatedFailuresMap. 176 * 177 * repeatedFailuresMap stores the failure information for all remote hosts 178 * that had failures. In order to avoid these from growing indefinitely, 179 * occassionallyCleanupFailureInformation() will clear these up once every 180 * cleanupInterval ms. 181 */ 182 protected void occasionallyCleanupFailureInformation() { 183 long now = System.currentTimeMillis(); 184 if (!(now > lastFailureMapCleanupTimeMilliSec 185 + failureMapCleanupIntervalMilliSec)) 186 return; 187 188 // remove entries that haven't been attempted in a while 189 // No synchronization needed. It is okay if multiple threads try to 190 // remove the entry again and again from a concurrent hash map. 191 StringBuilder sb = new StringBuilder(); 192 for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) { 193 if (now > entry.getValue().timeOfLatestAttemptMilliSec 194 + failureMapCleanupIntervalMilliSec) { // no recent failures 195 repeatedFailuresMap.remove(entry.getKey()); 196 } else if (now > entry.getValue().timeOfFirstFailureMilliSec 197 + this.fastFailClearingTimeMilliSec) { // been failing for a long 198 // time 199 LOG.error(entry.getKey() 200 + " been failing for a long time. clearing out." 201 + entry.getValue().toString()); 202 repeatedFailuresMap.remove(entry.getKey()); 203 } else { 204 sb.append(entry.getKey().toString()).append(" failing ") 205 .append(entry.getValue().toString()).append("\n"); 206 } 207 } 208 if (sb.length() > 0) { 209 LOG.warn("Preemptive failure enabled for : " + sb.toString()); 210 } 211 lastFailureMapCleanupTimeMilliSec = now; 212 } 213 214 /** 215 * Checks to see if we are in the Fast fail mode for requests to the server. 216 * 217 * If a client is unable to contact a server for more than 218 * fastFailThresholdMilliSec the client will get into fast fail mode. 219 * 220 * @param server 221 * @return true if the client is in fast fail mode for the server. 222 */ 223 private boolean inFastFailMode(ServerName server) { 224 FailureInfo fInfo = repeatedFailuresMap.get(server); 225 // if fInfo is null --> The server is considered good. 226 // If the server is bad, wait long enough to believe that the server is 227 // down. 228 return (fInfo != null && 229 EnvironmentEdgeManager.currentTime() > 230 (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec)); 231 } 232 233 /** 234 * Checks to see if the current thread is already in FastFail mode for *some* 235 * server. 236 * 237 * @return true, if the thread is already in FF mode. 238 */ 239 private boolean currentThreadInFastFailMode() { 240 return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode 241 .get().booleanValue() == true)); 242 } 243 244 /** 245 * Check to see if the client should try to connnect to the server, inspite of 246 * knowing that it is in the fast fail mode. 247 * 248 * The idea here is that we want just one client thread to be actively trying 249 * to reconnect, while all the other threads trying to reach the server will 250 * short circuit. 251 * 252 * @param fInfo 253 * @return true if the client should try to connect to the server. 254 */ 255 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { 256 // We believe that the server is down, But, we want to have just one 257 // client 258 // actively trying to connect. If we are the chosen one, we will retry 259 // and not throw an exception. 260 if (fInfo != null 261 && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) { 262 MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode 263 .get(); 264 if (threadAlreadyInFF == null) { 265 threadAlreadyInFF = new MutableBoolean(); 266 this.threadRetryingInFastFailMode.set(threadAlreadyInFF); 267 } 268 threadAlreadyInFF.setValue(true); 269 return true; 270 } else { 271 return false; 272 } 273 } 274 275 /** 276 * 277 * This function updates the Failure info for a particular server after the 278 * attempt to 279 * 280 * @param server 281 * @param fInfo 282 * @param couldNotCommunicate 283 * @param retryDespiteFastFailMode 284 */ 285 private void updateFailureInfoForServer(ServerName server, 286 FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate, 287 boolean retryDespiteFastFailMode) { 288 if (server == null || fInfo == null || didTry == false) 289 return; 290 291 // If we were able to connect to the server, reset the failure 292 // information. 293 if (couldNotCommunicate == false) { 294 LOG.info("Clearing out PFFE for server " + server); 295 repeatedFailuresMap.remove(server); 296 } else { 297 // update time of last attempt 298 long currentTime = System.currentTimeMillis(); 299 fInfo.timeOfLatestAttemptMilliSec = currentTime; 300 301 // Release the lock if we were retrying inspite of FastFail 302 if (retryDespiteFastFailMode) { 303 fInfo.exclusivelyRetringInspiteOfFastFail.set(false); 304 threadRetryingInFastFailMode.get().setValue(false); 305 } 306 } 307 308 occasionallyCleanupFailureInformation(); 309 } 310 311 @Override 312 public void intercept(RetryingCallerInterceptorContext context) 313 throws PreemptiveFastFailException { 314 if (context instanceof FastFailInterceptorContext) { 315 intercept((FastFailInterceptorContext) context); 316 } 317 } 318 319 @Override 320 public void handleFailure(RetryingCallerInterceptorContext context, 321 Throwable t) throws IOException { 322 if (context instanceof FastFailInterceptorContext) { 323 handleFailure((FastFailInterceptorContext) context, t); 324 } 325 } 326 327 @Override 328 public void updateFailureInfo(RetryingCallerInterceptorContext context) { 329 if (context instanceof FastFailInterceptorContext) { 330 updateFailureInfo((FastFailInterceptorContext) context); 331 } 332 } 333 334 @Override 335 public RetryingCallerInterceptorContext createEmptyContext() { 336 return new FastFailInterceptorContext(); 337 } 338 339 protected boolean isServerInFailureMap(ServerName serverName) { 340 return this.repeatedFailuresMap.containsKey(serverName); 341 } 342 343 @Override 344 public String toString() { 345 return "PreemptiveFastFailInterceptor"; 346 } 347}