View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.EOFException;
21  import java.io.IOException;
22  import java.io.SyncFailedException;
23  import java.lang.reflect.UndeclaredThrowableException;
24  import java.net.ConnectException;
25  import java.net.SocketTimeoutException;
26  import java.nio.channels.ClosedChannelException;
27  import java.util.Map.Entry;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.TimeoutException;
31  
32  import org.apache.commons.lang.mutable.MutableBoolean;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.DoNotRetryIOException;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.ServerName;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
41  import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
42  import org.apache.hadoop.hbase.ipc.FailedServerException;
43  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
44  import org.apache.hadoop.ipc.RemoteException;
45  
46  /**
47   * 
48   * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
49   * feature.
50   * 
51   * The motivation is as follows : 
52   * In case where a large number of clients try and talk to a particular region server in hbase, if
53   * the region server goes down due to network problems, we might end up in a scenario where
54   * the clients would go into a state where they all start to retry.
55   * This behavior will set off many of the threads in pretty much the same path and they all would be
56   * sleeping giving rise to a state where the client either needs to create more threads to send new
57   * requests to other hbase machines or block because the client cannot create anymore threads.
58   * 
59   * In most cases the clients might prefer to have a bound on the number of threads that are created
60   * in order to send requests to hbase. This would mostly result in the client thread starvation.
61   * 
62   *  To circumvent this problem, the approach that is being taken here under is to let 1 of the many
63   *  threads who are trying to contact the regionserver with connection problems and let the other
64   *  threads get a {@link PreemptiveFastFailException} so that they can move on and take other
65   *  requests.
66   *  
67   *  This would give the client more flexibility on the kind of action he would want to take in cases
68   *  where the regionserver is down. He can either discard the requests and send a nack upstream
69   *  faster or have an application level retry or buffer the requests up so as to send them down to
70   *  hbase later.
71   *
72   */
73  @InterfaceAudience.Private
74  class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
75  
76    public static final Log LOG = LogFactory
77        .getLog(PreemptiveFastFailInterceptor.class);
78  
79    // amount of time to wait before we consider a server to be in fast fail
80    // mode
81    protected final long fastFailThresholdMilliSec;
82  
83    // Keeps track of failures when we cannot talk to a server. Helps in
84    // fast failing clients if the server is down for a long time.
85    protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap =
86        new ConcurrentHashMap<ServerName, FailureInfo>();
87  
88    // We populate repeatedFailuresMap every time there is a failure. So, to
89    // keep it from growing unbounded, we garbage collect the failure information
90    // every cleanupInterval.
91    protected final long failureMapCleanupIntervalMilliSec;
92  
93    protected volatile long lastFailureMapCleanupTimeMilliSec;
94  
95    // clear failure Info. Used to clean out all entries.
96    // A safety valve, in case the client does not exit the
97    // fast fail mode for any reason.
98    private long fastFailClearingTimeMilliSec;
99  
100   private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode =
101       new ThreadLocal<MutableBoolean>();
102 
103   public PreemptiveFastFailInterceptor(Configuration conf) {
104     this.fastFailThresholdMilliSec = conf.getLong(
105         HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
106         HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT);
107     this.failureMapCleanupIntervalMilliSec = conf.getLong(
108         HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
109         HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT);
110     lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime();
111   }
112 
113   public void intercept(FastFailInterceptorContext context)
114       throws PreemptiveFastFailException {
115     context.setFailureInfo(repeatedFailuresMap.get(context.getServer()));
116     if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) {
117       // In Fast-fail mode, all but one thread will fast fail. Check
118       // if we are that one chosen thread.
119       context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context
120           .getFailureInfo()));
121       if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry
122         LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : "
123             + context.getTries());
124         throw new PreemptiveFastFailException(
125             context.getFailureInfo().numConsecutiveFailures.get(),
126             context.getFailureInfo().timeOfFirstFailureMilliSec,
127             context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer());
128       }
129     }
130     context.setDidTry(true);
131   }
132 
133   public void handleFailure(FastFailInterceptorContext context,
134       Throwable t) throws IOException {
135     handleThrowable(t, context.getServer(),
136         context.getCouldNotCommunicateWithServer());
137   }
138 
139   public void updateFailureInfo(FastFailInterceptorContext context) {
140     updateFailureInfoForServer(context.getServer(), context.getFailureInfo(),
141         context.didTry(), context.getCouldNotCommunicateWithServer()
142             .booleanValue(), context.isRetryDespiteFastFailMode());
143   }
144 
145   /**
146    * Handles failures encountered when communicating with a server.
147    *
148    * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
149    * Throws RepeatedConnectException if the client is in Fast fail mode.
150    *
151    * @param serverName
152    * @param t
153    *          - the throwable to be handled.
154    * @throws PreemptiveFastFailException
155    */
156   private void handleFailureToServer(ServerName serverName, Throwable t) {
157     if (serverName == null || t == null) {
158       return;
159     }
160     long currentTime = EnvironmentEdgeManager.currentTime();
161     FailureInfo fInfo = repeatedFailuresMap.get(serverName);
162     if (fInfo == null) {
163       fInfo = new FailureInfo(currentTime);
164       FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo);
165 
166       if (oldfInfo != null) {
167         fInfo = oldfInfo;
168       }
169     }
170     fInfo.timeOfLatestAttemptMilliSec = currentTime;
171     fInfo.numConsecutiveFailures.incrementAndGet();
172   }
173 
174   public void handleThrowable(Throwable t1, ServerName serverName,
175       MutableBoolean couldNotCommunicateWithServer) throws IOException {
176     Throwable t2 = translateException(t1);
177     boolean isLocalException = !(t2 instanceof RemoteException);
178     if (isLocalException && isConnectionException(t2)) {
179       couldNotCommunicateWithServer.setValue(true);
180       handleFailureToServer(serverName, t2);
181     }
182   }
183 
184   private Throwable translateException(Throwable t) throws IOException {
185     if (t instanceof NoSuchMethodError) {
186       // We probably can't recover from this exception by retrying.
187       LOG.error(t);
188       throw (NoSuchMethodError) t;
189     }
190 
191     if (t instanceof NullPointerException) {
192       // The same here. This is probably a bug.
193       LOG.error(t.getMessage(), t);
194       throw (NullPointerException) t;
195     }
196 
197     if (t instanceof UndeclaredThrowableException) {
198       t = t.getCause();
199     }
200     if (t instanceof RemoteException) {
201       t = ((RemoteException) t).unwrapRemoteException();
202     }
203     if (t instanceof DoNotRetryIOException) {
204       throw (DoNotRetryIOException) t;
205     }
206     if (t instanceof Error) {
207       throw (Error) t;
208     }
209     return t;
210   }
211 
212   /**
213    * Check if the exception is something that indicates that we cannot
214    * contact/communicate with the server.
215    *
216    * @param e
217    * @return true when exception indicates that the client wasn't able to make contact with server
218    */
219   private boolean isConnectionException(Throwable e) {
220     if (e == null)
221       return false;
222     // This list covers most connectivity exceptions but not all.
223     // For example, in SocketOutputStream a plain IOException is thrown
224     // at times when the channel is closed.
225     return (e instanceof SocketTimeoutException
226         || e instanceof ConnectException || e instanceof ClosedChannelException
227         || e instanceof SyncFailedException || e instanceof EOFException
228         || e instanceof TimeoutException
229         || e instanceof ConnectionClosingException || e instanceof FailedServerException);
230   }
231 
232   /**
233    * Occasionally cleans up unused information in repeatedFailuresMap.
234    *
235    * repeatedFailuresMap stores the failure information for all remote hosts
236    * that had failures. In order to avoid these from growing indefinitely,
237    * occassionallyCleanupFailureInformation() will clear these up once every
238    * cleanupInterval ms.
239    */
240   protected void occasionallyCleanupFailureInformation() {
241     long now = System.currentTimeMillis();
242     if (!(now > lastFailureMapCleanupTimeMilliSec
243         + failureMapCleanupIntervalMilliSec))
244       return;
245 
246     // remove entries that haven't been attempted in a while
247     // No synchronization needed. It is okay if multiple threads try to
248     // remove the entry again and again from a concurrent hash map.
249     StringBuilder sb = new StringBuilder();
250     for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
251       if (now > entry.getValue().timeOfLatestAttemptMilliSec
252           + failureMapCleanupIntervalMilliSec) { // no recent failures
253         repeatedFailuresMap.remove(entry.getKey());
254       } else if (now > entry.getValue().timeOfFirstFailureMilliSec
255           + this.fastFailClearingTimeMilliSec) { // been failing for a long
256                                                  // time
257         LOG.error(entry.getKey()
258             + " been failing for a long time. clearing out."
259             + entry.getValue().toString());
260         repeatedFailuresMap.remove(entry.getKey());
261       } else {
262         sb.append(entry.getKey().toString()).append(" failing ")
263             .append(entry.getValue().toString()).append("\n");
264       }
265     }
266     if (sb.length() > 0) {
267       LOG.warn("Preemptive failure enabled for : " + sb.toString());
268     }
269     lastFailureMapCleanupTimeMilliSec = now;
270   }
271 
272   /**
273    * Checks to see if we are in the Fast fail mode for requests to the server.
274    *
275    * If a client is unable to contact a server for more than
276    * fastFailThresholdMilliSec the client will get into fast fail mode.
277    *
278    * @param server
279    * @return true if the client is in fast fail mode for the server.
280    */
281   private boolean inFastFailMode(ServerName server) {
282     FailureInfo fInfo = repeatedFailuresMap.get(server);
283     // if fInfo is null --> The server is considered good.
284     // If the server is bad, wait long enough to believe that the server is
285     // down.
286     return (fInfo != null &&
287         EnvironmentEdgeManager.currentTime() >
288           (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec));
289   }
290 
291   /**
292    * Checks to see if the current thread is already in FastFail mode for *some*
293    * server.
294    *
295    * @return true, if the thread is already in FF mode.
296    */
297   private boolean currentThreadInFastFailMode() {
298     return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode
299         .get().booleanValue() == true));
300   }
301 
302   /**
303    * Check to see if the client should try to connnect to the server, inspite of
304    * knowing that it is in the fast fail mode.
305    *
306    * The idea here is that we want just one client thread to be actively trying
307    * to reconnect, while all the other threads trying to reach the server will
308    * short circuit.
309    *
310    * @param fInfo
311    * @return true if the client should try to connect to the server.
312    */
313   protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
314     // We believe that the server is down, But, we want to have just one
315     // client
316     // actively trying to connect. If we are the chosen one, we will retry
317     // and not throw an exception.
318     if (fInfo != null
319         && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) {
320       MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
321           .get();
322       if (threadAlreadyInFF == null) {
323         threadAlreadyInFF = new MutableBoolean();
324         this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
325       }
326       threadAlreadyInFF.setValue(true);
327       return true;
328     } else {
329       return false;
330     }
331   }
332 
333   /**
334    *
335    * This function updates the Failure info for a particular server after the
336    * attempt to 
337    *
338    * @param server
339    * @param fInfo
340    * @param couldNotCommunicate
341    * @param retryDespiteFastFailMode
342    */
343   private void updateFailureInfoForServer(ServerName server,
344       FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
345       boolean retryDespiteFastFailMode) {
346     if (server == null || fInfo == null || didTry == false)
347       return;
348 
349     // If we were able to connect to the server, reset the failure
350     // information.
351     if (couldNotCommunicate == false) {
352       LOG.info("Clearing out PFFE for server " + server.getServerName());
353       repeatedFailuresMap.remove(server);
354     } else {
355       // update time of last attempt
356       long currentTime = System.currentTimeMillis();
357       fInfo.timeOfLatestAttemptMilliSec = currentTime;
358 
359       // Release the lock if we were retrying inspite of FastFail
360       if (retryDespiteFastFailMode) {
361         fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
362         threadRetryingInFastFailMode.get().setValue(false);
363       }
364     }
365 
366     occasionallyCleanupFailureInformation();
367   }
368 
369   @Override
370   public void intercept(RetryingCallerInterceptorContext context)
371       throws PreemptiveFastFailException {
372     if (context instanceof FastFailInterceptorContext) {
373       intercept((FastFailInterceptorContext) context);
374     }
375   }
376 
377   @Override
378   public void handleFailure(RetryingCallerInterceptorContext context,
379       Throwable t) throws IOException {
380     if (context instanceof FastFailInterceptorContext) {
381       handleFailure((FastFailInterceptorContext) context, t);
382     }
383   }
384 
385   @Override
386   public void updateFailureInfo(RetryingCallerInterceptorContext context) {
387     if (context instanceof FastFailInterceptorContext) {
388       updateFailureInfo((FastFailInterceptorContext) context);
389     }
390   }
391 
392   @Override
393   public RetryingCallerInterceptorContext createEmptyContext() {
394     return new FastFailInterceptorContext();
395   }
396 
397   protected boolean isServerInFailureMap(ServerName serverName) {
398     return this.repeatedFailuresMap.containsKey(serverName);
399   }
400 
401   @Override
402   public String toString() {
403     return "PreemptiveFastFailInterceptor";
404   }
405 }