001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.io.IOException; 023import java.util.Map.Entry; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026 027import org.apache.commons.lang3.mutable.MutableBoolean; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 032import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; 033import org.apache.hadoop.hbase.ipc.CallTimeoutException; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.ipc.RemoteException; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail 042 * feature. 043 * <p> 044 * The motivation is as follows : In case where a large number of clients try and talk to a 045 * particular region server in hbase, if the region server goes down due to network problems, we 046 * might end up in a scenario where the clients would go into a state where they all start to retry. 047 * This behavior will set off many of the threads in pretty much the same path and they all would be 048 * sleeping giving rise to a state where the client either needs to create more threads to send new 049 * requests to other hbase machines or block because the client cannot create anymore threads. 050 * <p> 051 * In most cases the clients might prefer to have a bound on the number of threads that are created 052 * in order to send requests to hbase. This would mostly result in the client thread starvation. 053 * <p> 054 * To circumvent this problem, the approach that is being taken here under is to let 1 of the many 055 * threads who are trying to contact the regionserver with connection problems and let the other 056 * threads get a {@link PreemptiveFastFailException} so that they can move on and take other 057 * requests. 058 * <p> 059 * This would give the client more flexibility on the kind of action he would want to take in cases 060 * where the regionserver is down. He can either discard the requests and send a nack upstream 061 * faster or have an application level retry or buffer the requests up so as to send them down to 062 * hbase later. 063 */ 064@InterfaceAudience.Private 065class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { 066 067 private static final Logger LOG = LoggerFactory 068 .getLogger(PreemptiveFastFailInterceptor.class); 069 070 // amount of time to wait before we consider a server to be in fast fail 071 // mode 072 protected final long fastFailThresholdMilliSec; 073 074 // Keeps track of failures when we cannot talk to a server. Helps in 075 // fast failing clients if the server is down for a long time. 076 protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap = new ConcurrentHashMap<>(); 077 078 // We populate repeatedFailuresMap every time there is a failure. So, to 079 // keep it from growing unbounded, we garbage collect the failure information 080 // every cleanupInterval. 081 protected final long failureMapCleanupIntervalMilliSec; 082 083 protected volatile long lastFailureMapCleanupTimeMilliSec; 084 085 // clear failure Info. Used to clean out all entries. 086 // A safety valve, in case the client does not exit the 087 // fast fail mode for any reason. 088 private long fastFailClearingTimeMilliSec; 089 090 private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = new ThreadLocal<>(); 091 092 public PreemptiveFastFailInterceptor(Configuration conf) { 093 this.fastFailThresholdMilliSec = conf.getLong( 094 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 095 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT); 096 this.failureMapCleanupIntervalMilliSec = conf.getLong( 097 HConstants.HBASE_CLIENT_FAILURE_MAP_CLEANUP_INTERVAL_MS, 098 HConstants.HBASE_CLIENT_FAILURE_MAP_CLEANUP_INTERVAL_MS_DEFAULT); 099 this.fastFailClearingTimeMilliSec = conf.getLong( 100 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, 101 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT); 102 lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime(); 103 } 104 105 public void intercept(FastFailInterceptorContext context) 106 throws PreemptiveFastFailException { 107 context.setFailureInfo(repeatedFailuresMap.get(context.getServer())); 108 if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) { 109 // In Fast-fail mode, all but one thread will fast fail. Check 110 // if we are that one chosen thread. 111 context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context 112 .getFailureInfo())); 113 if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry 114 LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : " 115 + context.getTries()); 116 throw new PreemptiveFastFailException( 117 context.getFailureInfo().numConsecutiveFailures.get(), 118 context.getFailureInfo().timeOfFirstFailureMilliSec, 119 context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(), 120 context.getGuaranteedClientSideOnly().isTrue()); 121 } 122 } 123 context.setDidTry(true); 124 } 125 126 public void handleFailure(FastFailInterceptorContext context, 127 Throwable t) throws IOException { 128 handleThrowable(t, context.getServer(), 129 context.getCouldNotCommunicateWithServer(), 130 context.getGuaranteedClientSideOnly()); 131 } 132 133 public void updateFailureInfo(FastFailInterceptorContext context) { 134 updateFailureInfoForServer(context.getServer(), context.getFailureInfo(), 135 context.didTry(), context.getCouldNotCommunicateWithServer() 136 .booleanValue(), context.isRetryDespiteFastFailMode()); 137 } 138 139 /** 140 * Handles failures encountered when communicating with a server. 141 * 142 * Updates the FailureInfo in repeatedFailuresMap to reflect the failure. 143 * Throws RepeatedConnectException if the client is in Fast fail mode. 144 * 145 * @param serverName 146 * @param t 147 * - the throwable to be handled. 148 * @throws PreemptiveFastFailException 149 */ 150 protected void handleFailureToServer(ServerName serverName, Throwable t) { 151 if (serverName == null || t == null) { 152 return; 153 } 154 long currentTime = EnvironmentEdgeManager.currentTime(); 155 FailureInfo fInfo = 156 computeIfAbsent(repeatedFailuresMap, serverName, () -> new FailureInfo(currentTime)); 157 fInfo.timeOfLatestAttemptMilliSec = currentTime; 158 fInfo.numConsecutiveFailures.incrementAndGet(); 159 } 160 161 public void handleThrowable(Throwable t1, ServerName serverName, 162 MutableBoolean couldNotCommunicateWithServer, 163 MutableBoolean guaranteedClientSideOnly) throws IOException { 164 Throwable t2 = ClientExceptionsUtil.translatePFFE(t1); 165 boolean isLocalException = !(t2 instanceof RemoteException); 166 167 if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2))) { 168 couldNotCommunicateWithServer.setValue(true); 169 guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException)); 170 handleFailureToServer(serverName, t2); 171 } 172 } 173 174 /** 175 * Occasionally cleans up unused information in repeatedFailuresMap. 176 * 177 * repeatedFailuresMap stores the failure information for all remote hosts 178 * that had failures. In order to avoid these from growing indefinitely, 179 * occassionallyCleanupFailureInformation() will clear these up once every 180 * cleanupInterval ms. 181 */ 182 protected void occasionallyCleanupFailureInformation() { 183 long now = System.currentTimeMillis(); 184 if (!(now > lastFailureMapCleanupTimeMilliSec 185 + failureMapCleanupIntervalMilliSec)) 186 return; 187 188 // remove entries that haven't been attempted in a while 189 // No synchronization needed. It is okay if multiple threads try to 190 // remove the entry again and again from a concurrent hash map. 191 StringBuilder sb = new StringBuilder(); 192 for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) { 193 if (now > entry.getValue().timeOfLatestAttemptMilliSec 194 + failureMapCleanupIntervalMilliSec) { // no recent failures 195 repeatedFailuresMap.remove(entry.getKey()); 196 } else if (now > entry.getValue().timeOfFirstFailureMilliSec 197 + this.fastFailClearingTimeMilliSec) { // been failing for a long 198 // time 199 LOG.error(entry.getKey() 200 + " been failing for a long time. clearing out." 201 + entry.getValue().toString()); 202 repeatedFailuresMap.remove(entry.getKey()); 203 } else { 204 sb.append(entry.getKey().toString()).append(" failing ") 205 .append(entry.getValue().toString()).append("\n"); 206 } 207 } 208 if (sb.length() > 0) { 209 LOG.warn("Preemptive failure enabled for : " + sb.toString()); 210 } 211 lastFailureMapCleanupTimeMilliSec = now; 212 } 213 214 /** 215 * Checks to see if we are in the Fast fail mode for requests to the server. 216 * 217 * If a client is unable to contact a server for more than 218 * fastFailThresholdMilliSec the client will get into fast fail mode. 219 * 220 * @param server 221 * @return true if the client is in fast fail mode for the server. 222 */ 223 private boolean inFastFailMode(ServerName server) { 224 FailureInfo fInfo = repeatedFailuresMap.get(server); 225 // if fInfo is null --> The server is considered good. 226 // If the server is bad, wait long enough to believe that the server is 227 // down. 228 return (fInfo != null && 229 EnvironmentEdgeManager.currentTime() > 230 (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec)); 231 } 232 233 /** 234 * Checks to see if the current thread is already in FastFail mode for *some* 235 * server. 236 * 237 * @return true, if the thread is already in FF mode. 238 */ 239 private boolean currentThreadInFastFailMode() { 240 return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode 241 .get().booleanValue() == true)); 242 } 243 244 /** 245 * Check to see if the client should try to connnect to the server, inspite of 246 * knowing that it is in the fast fail mode. 247 * 248 * The idea here is that we want just one client thread to be actively trying 249 * to reconnect, while all the other threads trying to reach the server will 250 * short circuit. 251 * 252 * @param fInfo 253 * @return true if the client should try to connect to the server. 254 */ 255 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { 256 // We believe that the server is down, But, we want to have just one 257 // client 258 // actively trying to connect. If we are the chosen one, we will retry 259 // and not throw an exception. 260 if (fInfo != null 261 && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) { 262 MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode 263 .get(); 264 if (threadAlreadyInFF == null) { 265 threadAlreadyInFF = new MutableBoolean(); 266 this.threadRetryingInFastFailMode.set(threadAlreadyInFF); 267 } 268 threadAlreadyInFF.setValue(true); 269 return true; 270 } else { 271 return false; 272 } 273 } 274 275 /** 276 * 277 * This function updates the Failure info for a particular server after the 278 * attempt to 279 * 280 * @param server 281 * @param fInfo 282 * @param couldNotCommunicate 283 * @param retryDespiteFastFailMode 284 */ 285 private void updateFailureInfoForServer(ServerName server, 286 FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate, 287 boolean retryDespiteFastFailMode) { 288 if (server == null || fInfo == null || didTry == false) 289 return; 290 291 // If we were able to connect to the server, reset the failure 292 // information. 293 if (couldNotCommunicate == false) { 294 LOG.info("Clearing out PFFE for server " + server); 295 repeatedFailuresMap.remove(server); 296 } else { 297 // update time of last attempt 298 long currentTime = System.currentTimeMillis(); 299 fInfo.timeOfLatestAttemptMilliSec = currentTime; 300 301 // Release the lock if we were retrying inspite of FastFail 302 if (retryDespiteFastFailMode) { 303 fInfo.exclusivelyRetringInspiteOfFastFail.set(false); 304 threadRetryingInFastFailMode.get().setValue(false); 305 } 306 } 307 308 occasionallyCleanupFailureInformation(); 309 } 310 311 @Override 312 public void intercept(RetryingCallerInterceptorContext context) 313 throws PreemptiveFastFailException { 314 if (context instanceof FastFailInterceptorContext) { 315 intercept((FastFailInterceptorContext) context); 316 } 317 } 318 319 @Override 320 public void handleFailure(RetryingCallerInterceptorContext context, 321 Throwable t) throws IOException { 322 if (context instanceof FastFailInterceptorContext) { 323 handleFailure((FastFailInterceptorContext) context, t); 324 } 325 } 326 327 @Override 328 public void updateFailureInfo(RetryingCallerInterceptorContext context) { 329 if (context instanceof FastFailInterceptorContext) { 330 updateFailureInfo((FastFailInterceptorContext) context); 331 } 332 } 333 334 @Override 335 public RetryingCallerInterceptorContext createEmptyContext() { 336 return new FastFailInterceptorContext(); 337 } 338 339 protected boolean isServerInFailureMap(ServerName serverName) { 340 return this.repeatedFailuresMap.containsKey(serverName); 341 } 342 343 @Override 344 public String toString() { 345 return "PreemptiveFastFailInterceptor"; 346 } 347}