001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.util.Map.Entry;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026import org.apache.commons.lang3.mutable.MutableBoolean;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
031import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
032import org.apache.hadoop.hbase.ipc.CallTimeoutException;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.ipc.RemoteException;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
041 * feature.
042 * <p>
043 * The motivation is as follows : In case where a large number of clients try and talk to a
044 * particular region server in hbase, if the region server goes down due to network problems, we
045 * might end up in a scenario where the clients would go into a state where they all start to retry.
046 * This behavior will set off many of the threads in pretty much the same path and they all would be
047 * sleeping giving rise to a state where the client either needs to create more threads to send new
048 * requests to other hbase machines or block because the client cannot create anymore threads.
049 * <p>
050 * In most cases the clients might prefer to have a bound on the number of threads that are created
051 * in order to send requests to hbase. This would mostly result in the client thread starvation.
052 * <p>
053 * To circumvent this problem, the approach that is being taken here under is to let 1 of the many
054 * threads who are trying to contact the regionserver with connection problems and let the other
055 * threads get a {@link PreemptiveFastFailException} so that they can move on and take other
056 * requests.
057 * <p>
058 * This would give the client more flexibility on the kind of action he would want to take in cases
059 * where the regionserver is down. He can either discard the requests and send a nack upstream
060 * faster or have an application level retry or buffer the requests up so as to send them down to
061 * hbase later.
062 */
063@InterfaceAudience.Private
064class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
065
066  private static final Logger LOG = LoggerFactory.getLogger(PreemptiveFastFailInterceptor.class);
067
068  // amount of time to wait before we consider a server to be in fast fail
069  // mode
070  protected final long fastFailThresholdMilliSec;
071
072  // Keeps track of failures when we cannot talk to a server. Helps in
073  // fast failing clients if the server is down for a long time.
074  protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap =
075    new ConcurrentHashMap<>();
076
077  // We populate repeatedFailuresMap every time there is a failure. So, to
078  // keep it from growing unbounded, we garbage collect the failure information
079  // every cleanupInterval.
080  protected final long failureMapCleanupIntervalMilliSec;
081
082  protected volatile long lastFailureMapCleanupTimeMilliSec;
083
084  // clear failure Info. Used to clean out all entries.
085  // A safety valve, in case the client does not exit the
086  // fast fail mode for any reason.
087  private long fastFailClearingTimeMilliSec;
088
089  private static final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode =
090    new ThreadLocal<>();
091
092  public PreemptiveFastFailInterceptor(Configuration conf) {
093    this.fastFailThresholdMilliSec = conf.getLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
094      HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT);
095    this.failureMapCleanupIntervalMilliSec =
096      conf.getLong(HConstants.HBASE_CLIENT_FAILURE_MAP_CLEANUP_INTERVAL_MS,
097        HConstants.HBASE_CLIENT_FAILURE_MAP_CLEANUP_INTERVAL_MS_DEFAULT);
098    this.fastFailClearingTimeMilliSec =
099      conf.getLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
100        HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT);
101    lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime();
102  }
103
104  public void intercept(FastFailInterceptorContext context) throws PreemptiveFastFailException {
105    context.setFailureInfo(repeatedFailuresMap.get(context.getServer()));
106    if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) {
107      // In Fast-fail mode, all but one thread will fast fail. Check
108      // if we are that one chosen thread.
109      context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context.getFailureInfo()));
110      if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry
111        LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : " + context.getTries());
112        throw new PreemptiveFastFailException(context.getFailureInfo().numConsecutiveFailures.get(),
113          context.getFailureInfo().timeOfFirstFailureMilliSec,
114          context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(),
115          context.getGuaranteedClientSideOnly().isTrue());
116      }
117    }
118    context.setDidTry(true);
119  }
120
121  public void handleFailure(FastFailInterceptorContext context, Throwable t) throws IOException {
122    handleThrowable(t, context.getServer(), context.getCouldNotCommunicateWithServer(),
123      context.getGuaranteedClientSideOnly());
124  }
125
126  public void updateFailureInfo(FastFailInterceptorContext context) {
127    updateFailureInfoForServer(context.getServer(), context.getFailureInfo(), context.didTry(),
128      context.getCouldNotCommunicateWithServer().booleanValue(),
129      context.isRetryDespiteFastFailMode());
130  }
131
132  /**
133   * Handles failures encountered when communicating with a server. Updates the FailureInfo in
134   * repeatedFailuresMap to reflect the failure. Throws RepeatedConnectException if the client is in
135   * Fast fail mode. nn * - the throwable to be handled. n
136   */
137  protected void handleFailureToServer(ServerName serverName, Throwable t) {
138    if (serverName == null || t == null) {
139      return;
140    }
141    long currentTime = EnvironmentEdgeManager.currentTime();
142    FailureInfo fInfo =
143      computeIfAbsent(repeatedFailuresMap, serverName, () -> new FailureInfo(currentTime));
144    fInfo.timeOfLatestAttemptMilliSec = currentTime;
145    fInfo.numConsecutiveFailures.incrementAndGet();
146  }
147
148  public void handleThrowable(Throwable t1, ServerName serverName,
149    MutableBoolean couldNotCommunicateWithServer, MutableBoolean guaranteedClientSideOnly)
150    throws IOException {
151    Throwable t2 = ClientExceptionsUtil.translatePFFE(t1);
152    boolean isLocalException = !(t2 instanceof RemoteException);
153
154    if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2))) {
155      couldNotCommunicateWithServer.setValue(true);
156      guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException));
157      handleFailureToServer(serverName, t2);
158    }
159  }
160
161  /**
162   * Occasionally cleans up unused information in repeatedFailuresMap. repeatedFailuresMap stores
163   * the failure information for all remote hosts that had failures. In order to avoid these from
164   * growing indefinitely, occassionallyCleanupFailureInformation() will clear these up once every
165   * cleanupInterval ms.
166   */
167  protected void occasionallyCleanupFailureInformation() {
168    long now = EnvironmentEdgeManager.currentTime();
169    if (!(now > lastFailureMapCleanupTimeMilliSec + failureMapCleanupIntervalMilliSec)) return;
170
171    // remove entries that haven't been attempted in a while
172    // No synchronization needed. It is okay if multiple threads try to
173    // remove the entry again and again from a concurrent hash map.
174    StringBuilder sb = new StringBuilder();
175    for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
176      if (now > entry.getValue().timeOfLatestAttemptMilliSec + failureMapCleanupIntervalMilliSec) { // no
177                                                                                                    // recent
178                                                                                                    // failures
179        repeatedFailuresMap.remove(entry.getKey());
180      } else
181        if (now > entry.getValue().timeOfFirstFailureMilliSec + this.fastFailClearingTimeMilliSec) { // been
182                                                                                                     // failing
183                                                                                                     // for
184                                                                                                     // a
185                                                                                                     // long
186                                                                                                     // time
187          LOG.error(entry.getKey() + " been failing for a long time. clearing out."
188            + entry.getValue().toString());
189          repeatedFailuresMap.remove(entry.getKey());
190        } else {
191          sb.append(entry.getKey().toString()).append(" failing ")
192            .append(entry.getValue().toString()).append("\n");
193        }
194    }
195    if (sb.length() > 0) {
196      LOG.warn("Preemptive failure enabled for : " + sb.toString());
197    }
198    lastFailureMapCleanupTimeMilliSec = now;
199  }
200
201  /**
202   * Checks to see if we are in the Fast fail mode for requests to the server. If a client is unable
203   * to contact a server for more than fastFailThresholdMilliSec the client will get into fast fail
204   * mode. n * @return true if the client is in fast fail mode for the server.
205   */
206  private boolean inFastFailMode(ServerName server) {
207    FailureInfo fInfo = repeatedFailuresMap.get(server);
208    // if fInfo is null --> The server is considered good.
209    // If the server is bad, wait long enough to believe that the server is
210    // down.
211    return (fInfo != null && EnvironmentEdgeManager.currentTime()
212        > (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec));
213  }
214
215  /**
216   * Checks to see if the current thread is already in FastFail mode for *some* server.
217   * @return true, if the thread is already in FF mode.
218   */
219  private boolean currentThreadInFastFailMode() {
220    return (threadRetryingInFastFailMode.get() != null
221      && (threadRetryingInFastFailMode.get().booleanValue() == true));
222  }
223
224  /**
225   * Check to see if the client should try to connnect to the server, inspite of knowing that it is
226   * in the fast fail mode. The idea here is that we want just one client thread to be actively
227   * trying to reconnect, while all the other threads trying to reach the server will short circuit.
228   * n * @return true if the client should try to connect to the server.
229   */
230  protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
231    // We believe that the server is down, But, we want to have just one
232    // client
233    // actively trying to connect. If we are the chosen one, we will retry
234    // and not throw an exception.
235    if (fInfo != null && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) {
236      MutableBoolean threadAlreadyInFF = threadRetryingInFastFailMode.get();
237      if (threadAlreadyInFF == null) {
238        threadAlreadyInFF = new MutableBoolean();
239        threadRetryingInFastFailMode.set(threadAlreadyInFF);
240      }
241      threadAlreadyInFF.setValue(true);
242      return true;
243    } else {
244      return false;
245    }
246  }
247
248  /**
249   * This function updates the Failure info for a particular server after the attempt to nnnn
250   */
251  private void updateFailureInfoForServer(ServerName server, FailureInfo fInfo, boolean didTry,
252    boolean couldNotCommunicate, boolean retryDespiteFastFailMode) {
253    if (server == null || fInfo == null || didTry == false) return;
254
255    // If we were able to connect to the server, reset the failure
256    // information.
257    if (couldNotCommunicate == false) {
258      LOG.info("Clearing out PFFE for server " + server);
259      repeatedFailuresMap.remove(server);
260    } else {
261      // update time of last attempt
262      long currentTime = EnvironmentEdgeManager.currentTime();
263      fInfo.timeOfLatestAttemptMilliSec = currentTime;
264
265      // Release the lock if we were retrying inspite of FastFail
266      if (retryDespiteFastFailMode) {
267        fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
268        threadRetryingInFastFailMode.get().setValue(false);
269      }
270    }
271
272    occasionallyCleanupFailureInformation();
273  }
274
275  @Override
276  public void intercept(RetryingCallerInterceptorContext context)
277    throws PreemptiveFastFailException {
278    if (context instanceof FastFailInterceptorContext) {
279      intercept((FastFailInterceptorContext) context);
280    }
281  }
282
283  @Override
284  public void handleFailure(RetryingCallerInterceptorContext context, Throwable t)
285    throws IOException {
286    if (context instanceof FastFailInterceptorContext) {
287      handleFailure((FastFailInterceptorContext) context, t);
288    }
289  }
290
291  @Override
292  public void updateFailureInfo(RetryingCallerInterceptorContext context) {
293    if (context instanceof FastFailInterceptorContext) {
294      updateFailureInfo((FastFailInterceptorContext) context);
295    }
296  }
297
298  @Override
299  public RetryingCallerInterceptorContext createEmptyContext() {
300    return new FastFailInterceptorContext();
301  }
302
303  protected boolean isServerInFailureMap(ServerName serverName) {
304    return this.repeatedFailuresMap.containsKey(serverName);
305  }
306
307  @Override
308  public String toString() {
309    return "PreemptiveFastFailInterceptor";
310  }
311}