001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 023 024import java.io.IOException; 025import java.util.Map.Entry; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028 029import org.apache.commons.lang3.mutable.MutableBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 037import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; 038import org.apache.hadoop.hbase.ipc.CallTimeoutException; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.ipc.RemoteException; 041 042/** 043 * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail 044 * feature. 045 * <p> 046 * The motivation is as follows : In case where a large number of clients try and talk to a 047 * particular region server in hbase, if the region server goes down due to network problems, we 048 * might end up in a scenario where the clients would go into a state where they all start to retry. 049 * This behavior will set off many of the threads in pretty much the same path and they all would be 050 * sleeping giving rise to a state where the client either needs to create more threads to send new 051 * requests to other hbase machines or block because the client cannot create anymore threads. 052 * <p> 053 * In most cases the clients might prefer to have a bound on the number of threads that are created 054 * in order to send requests to hbase. This would mostly result in the client thread starvation. 055 * <p> 056 * To circumvent this problem, the approach that is being taken here under is to let 1 of the many 057 * threads who are trying to contact the regionserver with connection problems and let the other 058 * threads get a {@link PreemptiveFastFailException} so that they can move on and take other 059 * requests. 060 * <p> 061 * This would give the client more flexibility on the kind of action he would want to take in cases 062 * where the regionserver is down. He can either discard the requests and send a nack upstream 063 * faster or have an application level retry or buffer the requests up so as to send them down to 064 * hbase later. 065 */ 066@InterfaceAudience.Private 067class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { 068 069 private static final Logger LOG = LoggerFactory 070 .getLogger(PreemptiveFastFailInterceptor.class); 071 072 // amount of time to wait before we consider a server to be in fast fail 073 // mode 074 protected final long fastFailThresholdMilliSec; 075 076 // Keeps track of failures when we cannot talk to a server. Helps in 077 // fast failing clients if the server is down for a long time. 078 protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap = new ConcurrentHashMap<>(); 079 080 // We populate repeatedFailuresMap every time there is a failure. So, to 081 // keep it from growing unbounded, we garbage collect the failure information 082 // every cleanupInterval. 083 protected final long failureMapCleanupIntervalMilliSec; 084 085 protected volatile long lastFailureMapCleanupTimeMilliSec; 086 087 // clear failure Info. Used to clean out all entries. 088 // A safety valve, in case the client does not exit the 089 // fast fail mode for any reason. 090 private long fastFailClearingTimeMilliSec; 091 092 private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = new ThreadLocal<>(); 093 094 public PreemptiveFastFailInterceptor(Configuration conf) { 095 this.fastFailThresholdMilliSec = conf.getLong( 096 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 097 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT); 098 this.failureMapCleanupIntervalMilliSec = conf.getLong( 099 HConstants.HBASE_CLIENT_FAILURE_MAP_CLEANUP_INTERVAL_MS, 100 HConstants.HBASE_CLIENT_FAILURE_MAP_CLEANUP_INTERVAL_MS_DEFAULT); 101 this.fastFailClearingTimeMilliSec = conf.getLong( 102 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, 103 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT); 104 lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime(); 105 } 106 107 public void intercept(FastFailInterceptorContext context) 108 throws PreemptiveFastFailException { 109 context.setFailureInfo(repeatedFailuresMap.get(context.getServer())); 110 if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) { 111 // In Fast-fail mode, all but one thread will fast fail. Check 112 // if we are that one chosen thread. 113 context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context 114 .getFailureInfo())); 115 if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry 116 LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : " 117 + context.getTries()); 118 throw new PreemptiveFastFailException( 119 context.getFailureInfo().numConsecutiveFailures.get(), 120 context.getFailureInfo().timeOfFirstFailureMilliSec, 121 context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(), 122 context.getGuaranteedClientSideOnly().isTrue()); 123 } 124 } 125 context.setDidTry(true); 126 } 127 128 public void handleFailure(FastFailInterceptorContext context, 129 Throwable t) throws IOException { 130 handleThrowable(t, context.getServer(), 131 context.getCouldNotCommunicateWithServer(), 132 context.getGuaranteedClientSideOnly()); 133 } 134 135 public void updateFailureInfo(FastFailInterceptorContext context) { 136 updateFailureInfoForServer(context.getServer(), context.getFailureInfo(), 137 context.didTry(), context.getCouldNotCommunicateWithServer() 138 .booleanValue(), context.isRetryDespiteFastFailMode()); 139 } 140 141 /** 142 * Handles failures encountered when communicating with a server. 143 * 144 * Updates the FailureInfo in repeatedFailuresMap to reflect the failure. 145 * Throws RepeatedConnectException if the client is in Fast fail mode. 146 * 147 * @param serverName 148 * @param t 149 * - the throwable to be handled. 150 * @throws PreemptiveFastFailException 151 */ 152 @VisibleForTesting 153 protected void handleFailureToServer(ServerName serverName, Throwable t) { 154 if (serverName == null || t == null) { 155 return; 156 } 157 long currentTime = EnvironmentEdgeManager.currentTime(); 158 FailureInfo fInfo = 159 computeIfAbsent(repeatedFailuresMap, serverName, () -> new FailureInfo(currentTime)); 160 fInfo.timeOfLatestAttemptMilliSec = currentTime; 161 fInfo.numConsecutiveFailures.incrementAndGet(); 162 } 163 164 public void handleThrowable(Throwable t1, ServerName serverName, 165 MutableBoolean couldNotCommunicateWithServer, 166 MutableBoolean guaranteedClientSideOnly) throws IOException { 167 Throwable t2 = ClientExceptionsUtil.translatePFFE(t1); 168 boolean isLocalException = !(t2 instanceof RemoteException); 169 170 if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2))) { 171 couldNotCommunicateWithServer.setValue(true); 172 guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException)); 173 handleFailureToServer(serverName, t2); 174 } 175 } 176 177 /** 178 * Occasionally cleans up unused information in repeatedFailuresMap. 179 * 180 * repeatedFailuresMap stores the failure information for all remote hosts 181 * that had failures. In order to avoid these from growing indefinitely, 182 * occassionallyCleanupFailureInformation() will clear these up once every 183 * cleanupInterval ms. 184 */ 185 protected void occasionallyCleanupFailureInformation() { 186 long now = System.currentTimeMillis(); 187 if (!(now > lastFailureMapCleanupTimeMilliSec 188 + failureMapCleanupIntervalMilliSec)) 189 return; 190 191 // remove entries that haven't been attempted in a while 192 // No synchronization needed. It is okay if multiple threads try to 193 // remove the entry again and again from a concurrent hash map. 194 StringBuilder sb = new StringBuilder(); 195 for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) { 196 if (now > entry.getValue().timeOfLatestAttemptMilliSec 197 + failureMapCleanupIntervalMilliSec) { // no recent failures 198 repeatedFailuresMap.remove(entry.getKey()); 199 } else if (now > entry.getValue().timeOfFirstFailureMilliSec 200 + this.fastFailClearingTimeMilliSec) { // been failing for a long 201 // time 202 LOG.error(entry.getKey() 203 + " been failing for a long time. clearing out." 204 + entry.getValue().toString()); 205 repeatedFailuresMap.remove(entry.getKey()); 206 } else { 207 sb.append(entry.getKey().toString()).append(" failing ") 208 .append(entry.getValue().toString()).append("\n"); 209 } 210 } 211 if (sb.length() > 0) { 212 LOG.warn("Preemptive failure enabled for : " + sb.toString()); 213 } 214 lastFailureMapCleanupTimeMilliSec = now; 215 } 216 217 /** 218 * Checks to see if we are in the Fast fail mode for requests to the server. 219 * 220 * If a client is unable to contact a server for more than 221 * fastFailThresholdMilliSec the client will get into fast fail mode. 222 * 223 * @param server 224 * @return true if the client is in fast fail mode for the server. 225 */ 226 private boolean inFastFailMode(ServerName server) { 227 FailureInfo fInfo = repeatedFailuresMap.get(server); 228 // if fInfo is null --> The server is considered good. 229 // If the server is bad, wait long enough to believe that the server is 230 // down. 231 return (fInfo != null && 232 EnvironmentEdgeManager.currentTime() > 233 (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec)); 234 } 235 236 /** 237 * Checks to see if the current thread is already in FastFail mode for *some* 238 * server. 239 * 240 * @return true, if the thread is already in FF mode. 241 */ 242 private boolean currentThreadInFastFailMode() { 243 return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode 244 .get().booleanValue() == true)); 245 } 246 247 /** 248 * Check to see if the client should try to connnect to the server, inspite of 249 * knowing that it is in the fast fail mode. 250 * 251 * The idea here is that we want just one client thread to be actively trying 252 * to reconnect, while all the other threads trying to reach the server will 253 * short circuit. 254 * 255 * @param fInfo 256 * @return true if the client should try to connect to the server. 257 */ 258 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { 259 // We believe that the server is down, But, we want to have just one 260 // client 261 // actively trying to connect. If we are the chosen one, we will retry 262 // and not throw an exception. 263 if (fInfo != null 264 && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) { 265 MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode 266 .get(); 267 if (threadAlreadyInFF == null) { 268 threadAlreadyInFF = new MutableBoolean(); 269 this.threadRetryingInFastFailMode.set(threadAlreadyInFF); 270 } 271 threadAlreadyInFF.setValue(true); 272 return true; 273 } else { 274 return false; 275 } 276 } 277 278 /** 279 * 280 * This function updates the Failure info for a particular server after the 281 * attempt to 282 * 283 * @param server 284 * @param fInfo 285 * @param couldNotCommunicate 286 * @param retryDespiteFastFailMode 287 */ 288 private void updateFailureInfoForServer(ServerName server, 289 FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate, 290 boolean retryDespiteFastFailMode) { 291 if (server == null || fInfo == null || didTry == false) 292 return; 293 294 // If we were able to connect to the server, reset the failure 295 // information. 296 if (couldNotCommunicate == false) { 297 LOG.info("Clearing out PFFE for server " + server); 298 repeatedFailuresMap.remove(server); 299 } else { 300 // update time of last attempt 301 long currentTime = System.currentTimeMillis(); 302 fInfo.timeOfLatestAttemptMilliSec = currentTime; 303 304 // Release the lock if we were retrying inspite of FastFail 305 if (retryDespiteFastFailMode) { 306 fInfo.exclusivelyRetringInspiteOfFastFail.set(false); 307 threadRetryingInFastFailMode.get().setValue(false); 308 } 309 } 310 311 occasionallyCleanupFailureInformation(); 312 } 313 314 @Override 315 public void intercept(RetryingCallerInterceptorContext context) 316 throws PreemptiveFastFailException { 317 if (context instanceof FastFailInterceptorContext) { 318 intercept((FastFailInterceptorContext) context); 319 } 320 } 321 322 @Override 323 public void handleFailure(RetryingCallerInterceptorContext context, 324 Throwable t) throws IOException { 325 if (context instanceof FastFailInterceptorContext) { 326 handleFailure((FastFailInterceptorContext) context, t); 327 } 328 } 329 330 @Override 331 public void updateFailureInfo(RetryingCallerInterceptorContext context) { 332 if (context instanceof FastFailInterceptorContext) { 333 updateFailureInfo((FastFailInterceptorContext) context); 334 } 335 } 336 337 @Override 338 public RetryingCallerInterceptorContext createEmptyContext() { 339 return new FastFailInterceptorContext(); 340 } 341 342 protected boolean isServerInFailureMap(ServerName serverName) { 343 return this.repeatedFailuresMap.containsKey(serverName); 344 } 345 346 @Override 347 public String toString() { 348 return "PreemptiveFastFailInterceptor"; 349 } 350}