001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
023
024import java.io.IOException;
025import java.util.Map.Entry;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028
029import org.apache.commons.lang3.mutable.MutableBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
037import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
038import org.apache.hadoop.hbase.ipc.CallTimeoutException;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.ipc.RemoteException;
041
042/**
043 * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
044 * feature.
045 * <p>
046 * The motivation is as follows : In case where a large number of clients try and talk to a
047 * particular region server in hbase, if the region server goes down due to network problems, we
048 * might end up in a scenario where the clients would go into a state where they all start to retry.
049 * This behavior will set off many of the threads in pretty much the same path and they all would be
050 * sleeping giving rise to a state where the client either needs to create more threads to send new
051 * requests to other hbase machines or block because the client cannot create anymore threads.
052 * <p>
053 * In most cases the clients might prefer to have a bound on the number of threads that are created
054 * in order to send requests to hbase. This would mostly result in the client thread starvation.
055 * <p>
056 * To circumvent this problem, the approach that is being taken here under is to let 1 of the many
057 * threads who are trying to contact the regionserver with connection problems and let the other
058 * threads get a {@link PreemptiveFastFailException} so that they can move on and take other
059 * requests.
060 * <p>
061 * This would give the client more flexibility on the kind of action he would want to take in cases
062 * where the regionserver is down. He can either discard the requests and send a nack upstream
063 * faster or have an application level retry or buffer the requests up so as to send them down to
064 * hbase later.
065 */
066@InterfaceAudience.Private
067class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
068
069  private static final Logger LOG = LoggerFactory
070      .getLogger(PreemptiveFastFailInterceptor.class);
071
072  // amount of time to wait before we consider a server to be in fast fail
073  // mode
074  protected final long fastFailThresholdMilliSec;
075
076  // Keeps track of failures when we cannot talk to a server. Helps in
077  // fast failing clients if the server is down for a long time.
078  protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap = new ConcurrentHashMap<>();
079
080  // We populate repeatedFailuresMap every time there is a failure. So, to
081  // keep it from growing unbounded, we garbage collect the failure information
082  // every cleanupInterval.
083  protected final long failureMapCleanupIntervalMilliSec;
084
085  protected volatile long lastFailureMapCleanupTimeMilliSec;
086
087  // clear failure Info. Used to clean out all entries.
088  // A safety valve, in case the client does not exit the
089  // fast fail mode for any reason.
090  private long fastFailClearingTimeMilliSec;
091
092  private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = new ThreadLocal<>();
093
094  public PreemptiveFastFailInterceptor(Configuration conf) {
095    this.fastFailThresholdMilliSec = conf.getLong(
096        HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
097        HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT);
098    this.failureMapCleanupIntervalMilliSec = conf.getLong(
099            HConstants.HBASE_CLIENT_FAILURE_MAP_CLEANUP_INTERVAL_MS,
100            HConstants.HBASE_CLIENT_FAILURE_MAP_CLEANUP_INTERVAL_MS_DEFAULT);
101    this.fastFailClearingTimeMilliSec = conf.getLong(
102            HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
103            HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT);
104    lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime();
105  }
106
107  public void intercept(FastFailInterceptorContext context)
108      throws PreemptiveFastFailException {
109    context.setFailureInfo(repeatedFailuresMap.get(context.getServer()));
110    if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) {
111      // In Fast-fail mode, all but one thread will fast fail. Check
112      // if we are that one chosen thread.
113      context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context
114          .getFailureInfo()));
115      if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry
116        LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : "
117            + context.getTries());
118        throw new PreemptiveFastFailException(
119            context.getFailureInfo().numConsecutiveFailures.get(),
120            context.getFailureInfo().timeOfFirstFailureMilliSec,
121            context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(),
122            context.getGuaranteedClientSideOnly().isTrue());
123      }
124    }
125    context.setDidTry(true);
126  }
127
128  public void handleFailure(FastFailInterceptorContext context,
129      Throwable t) throws IOException {
130    handleThrowable(t, context.getServer(),
131        context.getCouldNotCommunicateWithServer(),
132        context.getGuaranteedClientSideOnly());
133  }
134
135  public void updateFailureInfo(FastFailInterceptorContext context) {
136    updateFailureInfoForServer(context.getServer(), context.getFailureInfo(),
137        context.didTry(), context.getCouldNotCommunicateWithServer()
138            .booleanValue(), context.isRetryDespiteFastFailMode());
139  }
140
141  /**
142   * Handles failures encountered when communicating with a server.
143   *
144   * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
145   * Throws RepeatedConnectException if the client is in Fast fail mode.
146   *
147   * @param serverName
148   * @param t
149   *          - the throwable to be handled.
150   * @throws PreemptiveFastFailException
151   */
152  @VisibleForTesting
153  protected void handleFailureToServer(ServerName serverName, Throwable t) {
154    if (serverName == null || t == null) {
155      return;
156    }
157    long currentTime = EnvironmentEdgeManager.currentTime();
158    FailureInfo fInfo =
159        computeIfAbsent(repeatedFailuresMap, serverName, () -> new FailureInfo(currentTime));
160    fInfo.timeOfLatestAttemptMilliSec = currentTime;
161    fInfo.numConsecutiveFailures.incrementAndGet();
162  }
163
164  public void handleThrowable(Throwable t1, ServerName serverName,
165      MutableBoolean couldNotCommunicateWithServer,
166      MutableBoolean guaranteedClientSideOnly) throws IOException {
167    Throwable t2 = ClientExceptionsUtil.translatePFFE(t1);
168    boolean isLocalException = !(t2 instanceof RemoteException);
169
170    if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2))) {
171      couldNotCommunicateWithServer.setValue(true);
172      guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException));
173      handleFailureToServer(serverName, t2);
174    }
175  }
176
177  /**
178   * Occasionally cleans up unused information in repeatedFailuresMap.
179   *
180   * repeatedFailuresMap stores the failure information for all remote hosts
181   * that had failures. In order to avoid these from growing indefinitely,
182   * occassionallyCleanupFailureInformation() will clear these up once every
183   * cleanupInterval ms.
184   */
185  protected void occasionallyCleanupFailureInformation() {
186    long now = System.currentTimeMillis();
187    if (!(now > lastFailureMapCleanupTimeMilliSec
188        + failureMapCleanupIntervalMilliSec))
189      return;
190
191    // remove entries that haven't been attempted in a while
192    // No synchronization needed. It is okay if multiple threads try to
193    // remove the entry again and again from a concurrent hash map.
194    StringBuilder sb = new StringBuilder();
195    for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
196      if (now > entry.getValue().timeOfLatestAttemptMilliSec
197          + failureMapCleanupIntervalMilliSec) { // no recent failures
198        repeatedFailuresMap.remove(entry.getKey());
199      } else if (now > entry.getValue().timeOfFirstFailureMilliSec
200          + this.fastFailClearingTimeMilliSec) { // been failing for a long
201                                                 // time
202        LOG.error(entry.getKey()
203            + " been failing for a long time. clearing out."
204            + entry.getValue().toString());
205        repeatedFailuresMap.remove(entry.getKey());
206      } else {
207        sb.append(entry.getKey().toString()).append(" failing ")
208            .append(entry.getValue().toString()).append("\n");
209      }
210    }
211    if (sb.length() > 0) {
212      LOG.warn("Preemptive failure enabled for : " + sb.toString());
213    }
214    lastFailureMapCleanupTimeMilliSec = now;
215  }
216
217  /**
218   * Checks to see if we are in the Fast fail mode for requests to the server.
219   *
220   * If a client is unable to contact a server for more than
221   * fastFailThresholdMilliSec the client will get into fast fail mode.
222   *
223   * @param server
224   * @return true if the client is in fast fail mode for the server.
225   */
226  private boolean inFastFailMode(ServerName server) {
227    FailureInfo fInfo = repeatedFailuresMap.get(server);
228    // if fInfo is null --> The server is considered good.
229    // If the server is bad, wait long enough to believe that the server is
230    // down.
231    return (fInfo != null &&
232        EnvironmentEdgeManager.currentTime() >
233          (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec));
234  }
235
236  /**
237   * Checks to see if the current thread is already in FastFail mode for *some*
238   * server.
239   *
240   * @return true, if the thread is already in FF mode.
241   */
242  private boolean currentThreadInFastFailMode() {
243    return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode
244        .get().booleanValue() == true));
245  }
246
247  /**
248   * Check to see if the client should try to connnect to the server, inspite of
249   * knowing that it is in the fast fail mode.
250   *
251   * The idea here is that we want just one client thread to be actively trying
252   * to reconnect, while all the other threads trying to reach the server will
253   * short circuit.
254   *
255   * @param fInfo
256   * @return true if the client should try to connect to the server.
257   */
258  protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
259    // We believe that the server is down, But, we want to have just one
260    // client
261    // actively trying to connect. If we are the chosen one, we will retry
262    // and not throw an exception.
263    if (fInfo != null
264        && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) {
265      MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
266          .get();
267      if (threadAlreadyInFF == null) {
268        threadAlreadyInFF = new MutableBoolean();
269        this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
270      }
271      threadAlreadyInFF.setValue(true);
272      return true;
273    } else {
274      return false;
275    }
276  }
277
278  /**
279   *
280   * This function updates the Failure info for a particular server after the
281   * attempt to 
282   *
283   * @param server
284   * @param fInfo
285   * @param couldNotCommunicate
286   * @param retryDespiteFastFailMode
287   */
288  private void updateFailureInfoForServer(ServerName server,
289      FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
290      boolean retryDespiteFastFailMode) {
291    if (server == null || fInfo == null || didTry == false)
292      return;
293
294    // If we were able to connect to the server, reset the failure
295    // information.
296    if (couldNotCommunicate == false) {
297      LOG.info("Clearing out PFFE for server " + server);
298      repeatedFailuresMap.remove(server);
299    } else {
300      // update time of last attempt
301      long currentTime = System.currentTimeMillis();
302      fInfo.timeOfLatestAttemptMilliSec = currentTime;
303
304      // Release the lock if we were retrying inspite of FastFail
305      if (retryDespiteFastFailMode) {
306        fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
307        threadRetryingInFastFailMode.get().setValue(false);
308      }
309    }
310
311    occasionallyCleanupFailureInformation();
312  }
313
314  @Override
315  public void intercept(RetryingCallerInterceptorContext context)
316      throws PreemptiveFastFailException {
317    if (context instanceof FastFailInterceptorContext) {
318      intercept((FastFailInterceptorContext) context);
319    }
320  }
321
322  @Override
323  public void handleFailure(RetryingCallerInterceptorContext context,
324      Throwable t) throws IOException {
325    if (context instanceof FastFailInterceptorContext) {
326      handleFailure((FastFailInterceptorContext) context, t);
327    }
328  }
329
330  @Override
331  public void updateFailureInfo(RetryingCallerInterceptorContext context) {
332    if (context instanceof FastFailInterceptorContext) {
333      updateFailureInfo((FastFailInterceptorContext) context);
334    }
335  }
336
337  @Override
338  public RetryingCallerInterceptorContext createEmptyContext() {
339    return new FastFailInterceptorContext();
340  }
341
342  protected boolean isServerInFailureMap(ServerName serverName) {
343    return this.repeatedFailuresMap.containsKey(serverName);
344  }
345
346  @Override
347  public String toString() {
348    return "PreemptiveFastFailInterceptor";
349  }
350}