001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
021
022import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
023
024import java.io.IOException;
025import java.util.Map.Entry;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028
029import org.apache.commons.lang3.mutable.MutableBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
037import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
038import org.apache.hadoop.hbase.ipc.CallTimeoutException;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.ipc.RemoteException;
041
042/**
043 * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
044 * feature.
045 * <p>
046 * The motivation is as follows : In case where a large number of clients try and talk to a
047 * particular region server in hbase, if the region server goes down due to network problems, we
048 * might end up in a scenario where the clients would go into a state where they all start to retry.
049 * This behavior will set off many of the threads in pretty much the same path and they all would be
050 * sleeping giving rise to a state where the client either needs to create more threads to send new
051 * requests to other hbase machines or block because the client cannot create anymore threads.
052 * <p>
053 * In most cases the clients might prefer to have a bound on the number of threads that are created
054 * in order to send requests to hbase. This would mostly result in the client thread starvation.
055 * <p>
056 * To circumvent this problem, the approach that is being taken here under is to let 1 of the many
057 * threads who are trying to contact the regionserver with connection problems and let the other
058 * threads get a {@link PreemptiveFastFailException} so that they can move on and take other
059 * requests.
060 * <p>
061 * This would give the client more flexibility on the kind of action he would want to take in cases
062 * where the regionserver is down. He can either discard the requests and send a nack upstream
063 * faster or have an application level retry or buffer the requests up so as to send them down to
064 * hbase later.
065 */
066@InterfaceAudience.Private
067class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
068
069  private static final Logger LOG = LoggerFactory
070      .getLogger(PreemptiveFastFailInterceptor.class);
071
072  // amount of time to wait before we consider a server to be in fast fail
073  // mode
074  protected final long fastFailThresholdMilliSec;
075
076  // Keeps track of failures when we cannot talk to a server. Helps in
077  // fast failing clients if the server is down for a long time.
078  protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap = new ConcurrentHashMap<>();
079
080  // We populate repeatedFailuresMap every time there is a failure. So, to
081  // keep it from growing unbounded, we garbage collect the failure information
082  // every cleanupInterval.
083  protected final long failureMapCleanupIntervalMilliSec;
084
085  protected volatile long lastFailureMapCleanupTimeMilliSec;
086
087  // clear failure Info. Used to clean out all entries.
088  // A safety valve, in case the client does not exit the
089  // fast fail mode for any reason.
090  private long fastFailClearingTimeMilliSec;
091
092  private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = new ThreadLocal<>();
093
094  public PreemptiveFastFailInterceptor(Configuration conf) {
095    this.fastFailThresholdMilliSec = conf.getLong(
096        HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
097        HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT);
098    this.failureMapCleanupIntervalMilliSec = conf.getLong(
099        HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
100        HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT);
101    lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime();
102  }
103
104  public void intercept(FastFailInterceptorContext context)
105      throws PreemptiveFastFailException {
106    context.setFailureInfo(repeatedFailuresMap.get(context.getServer()));
107    if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) {
108      // In Fast-fail mode, all but one thread will fast fail. Check
109      // if we are that one chosen thread.
110      context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context
111          .getFailureInfo()));
112      if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry
113        LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : "
114            + context.getTries());
115        throw new PreemptiveFastFailException(
116            context.getFailureInfo().numConsecutiveFailures.get(),
117            context.getFailureInfo().timeOfFirstFailureMilliSec,
118            context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(),
119            context.getGuaranteedClientSideOnly().isTrue());
120      }
121    }
122    context.setDidTry(true);
123  }
124
125  public void handleFailure(FastFailInterceptorContext context,
126      Throwable t) throws IOException {
127    handleThrowable(t, context.getServer(),
128        context.getCouldNotCommunicateWithServer(),
129        context.getGuaranteedClientSideOnly());
130  }
131
132  public void updateFailureInfo(FastFailInterceptorContext context) {
133    updateFailureInfoForServer(context.getServer(), context.getFailureInfo(),
134        context.didTry(), context.getCouldNotCommunicateWithServer()
135            .booleanValue(), context.isRetryDespiteFastFailMode());
136  }
137
138  /**
139   * Handles failures encountered when communicating with a server.
140   *
141   * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
142   * Throws RepeatedConnectException if the client is in Fast fail mode.
143   *
144   * @param serverName
145   * @param t
146   *          - the throwable to be handled.
147   * @throws PreemptiveFastFailException
148   */
149  @VisibleForTesting
150  protected void handleFailureToServer(ServerName serverName, Throwable t) {
151    if (serverName == null || t == null) {
152      return;
153    }
154    long currentTime = EnvironmentEdgeManager.currentTime();
155    FailureInfo fInfo =
156        computeIfAbsent(repeatedFailuresMap, serverName, () -> new FailureInfo(currentTime));
157    fInfo.timeOfLatestAttemptMilliSec = currentTime;
158    fInfo.numConsecutiveFailures.incrementAndGet();
159  }
160
161  public void handleThrowable(Throwable t1, ServerName serverName,
162      MutableBoolean couldNotCommunicateWithServer,
163      MutableBoolean guaranteedClientSideOnly) throws IOException {
164    Throwable t2 = ClientExceptionsUtil.translatePFFE(t1);
165    boolean isLocalException = !(t2 instanceof RemoteException);
166
167    if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2))) {
168      couldNotCommunicateWithServer.setValue(true);
169      guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException));
170      handleFailureToServer(serverName, t2);
171    }
172  }
173
174  /**
175   * Occasionally cleans up unused information in repeatedFailuresMap.
176   *
177   * repeatedFailuresMap stores the failure information for all remote hosts
178   * that had failures. In order to avoid these from growing indefinitely,
179   * occassionallyCleanupFailureInformation() will clear these up once every
180   * cleanupInterval ms.
181   */
182  protected void occasionallyCleanupFailureInformation() {
183    long now = System.currentTimeMillis();
184    if (!(now > lastFailureMapCleanupTimeMilliSec
185        + failureMapCleanupIntervalMilliSec))
186      return;
187
188    // remove entries that haven't been attempted in a while
189    // No synchronization needed. It is okay if multiple threads try to
190    // remove the entry again and again from a concurrent hash map.
191    StringBuilder sb = new StringBuilder();
192    for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
193      if (now > entry.getValue().timeOfLatestAttemptMilliSec
194          + failureMapCleanupIntervalMilliSec) { // no recent failures
195        repeatedFailuresMap.remove(entry.getKey());
196      } else if (now > entry.getValue().timeOfFirstFailureMilliSec
197          + this.fastFailClearingTimeMilliSec) { // been failing for a long
198                                                 // time
199        LOG.error(entry.getKey()
200            + " been failing for a long time. clearing out."
201            + entry.getValue().toString());
202        repeatedFailuresMap.remove(entry.getKey());
203      } else {
204        sb.append(entry.getKey().toString()).append(" failing ")
205            .append(entry.getValue().toString()).append("\n");
206      }
207    }
208    if (sb.length() > 0) {
209      LOG.warn("Preemptive failure enabled for : " + sb.toString());
210    }
211    lastFailureMapCleanupTimeMilliSec = now;
212  }
213
214  /**
215   * Checks to see if we are in the Fast fail mode for requests to the server.
216   *
217   * If a client is unable to contact a server for more than
218   * fastFailThresholdMilliSec the client will get into fast fail mode.
219   *
220   * @param server
221   * @return true if the client is in fast fail mode for the server.
222   */
223  private boolean inFastFailMode(ServerName server) {
224    FailureInfo fInfo = repeatedFailuresMap.get(server);
225    // if fInfo is null --> The server is considered good.
226    // If the server is bad, wait long enough to believe that the server is
227    // down.
228    return (fInfo != null &&
229        EnvironmentEdgeManager.currentTime() >
230          (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec));
231  }
232
233  /**
234   * Checks to see if the current thread is already in FastFail mode for *some*
235   * server.
236   *
237   * @return true, if the thread is already in FF mode.
238   */
239  private boolean currentThreadInFastFailMode() {
240    return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode
241        .get().booleanValue() == true));
242  }
243
244  /**
245   * Check to see if the client should try to connnect to the server, inspite of
246   * knowing that it is in the fast fail mode.
247   *
248   * The idea here is that we want just one client thread to be actively trying
249   * to reconnect, while all the other threads trying to reach the server will
250   * short circuit.
251   *
252   * @param fInfo
253   * @return true if the client should try to connect to the server.
254   */
255  protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
256    // We believe that the server is down, But, we want to have just one
257    // client
258    // actively trying to connect. If we are the chosen one, we will retry
259    // and not throw an exception.
260    if (fInfo != null
261        && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) {
262      MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
263          .get();
264      if (threadAlreadyInFF == null) {
265        threadAlreadyInFF = new MutableBoolean();
266        this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
267      }
268      threadAlreadyInFF.setValue(true);
269      return true;
270    } else {
271      return false;
272    }
273  }
274
275  /**
276   *
277   * This function updates the Failure info for a particular server after the
278   * attempt to 
279   *
280   * @param server
281   * @param fInfo
282   * @param couldNotCommunicate
283   * @param retryDespiteFastFailMode
284   */
285  private void updateFailureInfoForServer(ServerName server,
286      FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
287      boolean retryDespiteFastFailMode) {
288    if (server == null || fInfo == null || didTry == false)
289      return;
290
291    // If we were able to connect to the server, reset the failure
292    // information.
293    if (couldNotCommunicate == false) {
294      LOG.info("Clearing out PFFE for server " + server);
295      repeatedFailuresMap.remove(server);
296    } else {
297      // update time of last attempt
298      long currentTime = System.currentTimeMillis();
299      fInfo.timeOfLatestAttemptMilliSec = currentTime;
300
301      // Release the lock if we were retrying inspite of FastFail
302      if (retryDespiteFastFailMode) {
303        fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
304        threadRetryingInFastFailMode.get().setValue(false);
305      }
306    }
307
308    occasionallyCleanupFailureInformation();
309  }
310
311  @Override
312  public void intercept(RetryingCallerInterceptorContext context)
313      throws PreemptiveFastFailException {
314    if (context instanceof FastFailInterceptorContext) {
315      intercept((FastFailInterceptorContext) context);
316    }
317  }
318
319  @Override
320  public void handleFailure(RetryingCallerInterceptorContext context,
321      Throwable t) throws IOException {
322    if (context instanceof FastFailInterceptorContext) {
323      handleFailure((FastFailInterceptorContext) context, t);
324    }
325  }
326
327  @Override
328  public void updateFailureInfo(RetryingCallerInterceptorContext context) {
329    if (context instanceof FastFailInterceptorContext) {
330      updateFailureInfo((FastFailInterceptorContext) context);
331    }
332  }
333
334  @Override
335  public RetryingCallerInterceptorContext createEmptyContext() {
336    return new FastFailInterceptorContext();
337  }
338
339  protected boolean isServerInFailureMap(ServerName serverName) {
340    return this.repeatedFailuresMap.containsKey(serverName);
341  }
342
343  @Override
344  public String toString() {
345    return "PreemptiveFastFailInterceptor";
346  }
347}