View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.EOFException;
21  import java.io.IOException;
22  import java.io.SyncFailedException;
23  import java.lang.reflect.UndeclaredThrowableException;
24  import java.net.ConnectException;
25  import java.net.SocketTimeoutException;
26  import java.nio.channels.ClosedChannelException;
27  import java.util.Map.Entry;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.TimeoutException;
31  
32  import org.apache.commons.lang.mutable.MutableBoolean;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.DoNotRetryIOException;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.ServerName;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
41  import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
42  import org.apache.hadoop.hbase.ipc.FailedServerException;
43  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
44  import org.apache.hadoop.ipc.RemoteException;
45  
46  /**
47   * 
48   * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
49   * feature.
50   * 
51   * The motivation is as follows : 
52   * In case where a large number of clients try and talk to a particular region server in hbase, if
53   * the region server goes down due to network problems, we might end up in a scenario where
54   * the clients would go into a state where they all start to retry.
55   * This behavior will set off many of the threads in pretty much the same path and they all would be
56   * sleeping giving rise to a state where the client either needs to create more threads to send new
57   * requests to other hbase machines or block because the client cannot create anymore threads.
58   * 
59   * In most cases the clients might prefer to have a bound on the number of threads that are created
60   * in order to send requests to hbase. This would mostly result in the client thread starvation.
61   * 
62   *  To circumvent this problem, the approach that is being taken here under is to let 1 of the many
63   *  threads who are trying to contact the regionserver with connection problems and let the other
64   *  threads get a {@link PreemptiveFastFailException} so that they can move on and take other
65   *  requests.
66   *  
67   *  This would give the client more flexibility on the kind of action he would want to take in cases
68   *  where the regionserver is down. He can either discard the requests and send a nack upstream
69   *  faster or have an application level retry or buffer the requests up so as to send them down to
70   *  hbase later.
71   *
72   */
73  @InterfaceAudience.Private
74  class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
75  
76    private static final Log LOG = LogFactory
77        .getLog(PreemptiveFastFailInterceptor.class);
78  
79    // amount of time to wait before we consider a server to be in fast fail
80    // mode
81    protected final long fastFailThresholdMilliSec;
82  
83    // Keeps track of failures when we cannot talk to a server. Helps in
84    // fast failing clients if the server is down for a long time.
85    protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap =
86        new ConcurrentHashMap<ServerName, FailureInfo>();
87  
88    // We populate repeatedFailuresMap every time there is a failure. So, to
89    // keep it from growing unbounded, we garbage collect the failure information
90    // every cleanupInterval.
91    protected final long failureMapCleanupIntervalMilliSec;
92  
93    protected volatile long lastFailureMapCleanupTimeMilliSec;
94  
95    // clear failure Info. Used to clean out all entries.
96    // A safety valve, in case the client does not exit the
97    // fast fail mode for any reason.
98    private long fastFailClearingTimeMilliSec;
99  
100   private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode =
101       new ThreadLocal<MutableBoolean>();
102 
103   public PreemptiveFastFailInterceptor(Configuration conf) {
104     this.fastFailThresholdMilliSec = conf.getLong(
105         HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
106         HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT);
107     this.failureMapCleanupIntervalMilliSec = conf.getLong(
108         HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
109         HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT);
110     lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime();
111   }
112 
113   public void intercept(FastFailInterceptorContext context)
114       throws PreemptiveFastFailException {
115     context.setFailureInfo(repeatedFailuresMap.get(context.getServer()));
116     if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) {
117       // In Fast-fail mode, all but one thread will fast fail. Check
118       // if we are that one chosen thread.
119       context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context
120           .getFailureInfo()));
121       if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry
122         LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : "
123             + context.getTries());
124         throw new PreemptiveFastFailException(
125             context.getFailureInfo().numConsecutiveFailures.get(),
126             context.getFailureInfo().timeOfFirstFailureMilliSec,
127             context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer());
128       }
129     }
130     context.setDidTry(true);
131   }
132 
133   public void handleFailure(FastFailInterceptorContext context,
134       Throwable t) throws IOException {
135     handleThrowable(t, context.getServer(),
136         context.getCouldNotCommunicateWithServer());
137   }
138 
139   public void updateFailureInfo(FastFailInterceptorContext context) {
140     updateFailureInfoForServer(context.getServer(), context.getFailureInfo(),
141         context.didTry(), context.getCouldNotCommunicateWithServer()
142             .booleanValue(), context.isRetryDespiteFastFailMode());
143   }
144 
145   /**
146    * Handles failures encountered when communicating with a server.
147    *
148    * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
149    * Throws RepeatedConnectException if the client is in Fast fail mode.
150    *
151    * @param serverName
152    * @param t
153    *          - the throwable to be handled.
154    * @throws PreemptiveFastFailException
155    */
156   private void handleFailureToServer(ServerName serverName, Throwable t) {
157     if (serverName == null || t == null) {
158       return;
159     }
160     long currentTime = EnvironmentEdgeManager.currentTime();
161     FailureInfo fInfo = repeatedFailuresMap.get(serverName);
162     if (fInfo == null) {
163       fInfo = new FailureInfo(currentTime);
164       FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo);
165 
166       if (oldfInfo != null) {
167         fInfo = oldfInfo;
168       }
169     }
170     fInfo.timeOfLatestAttemptMilliSec = currentTime;
171     fInfo.numConsecutiveFailures.incrementAndGet();
172   }
173 
174   public void handleThrowable(Throwable t1, ServerName serverName,
175       MutableBoolean couldNotCommunicateWithServer) throws IOException {
176     Throwable t2 = translateException(t1);
177     boolean isLocalException = !(t2 instanceof RemoteException);
178     if (isLocalException && isConnectionException(t2)) {
179       couldNotCommunicateWithServer.setValue(true);
180       handleFailureToServer(serverName, t2);
181     }
182   }
183 
184   private Throwable translateException(Throwable t) throws IOException {
185     if (t instanceof NoSuchMethodError) {
186       // We probably can't recover from this exception by retrying.
187       LOG.error(t);
188       throw (NoSuchMethodError) t;
189     }
190 
191     if (t instanceof NullPointerException) {
192       // The same here. This is probably a bug.
193       LOG.error(t.getMessage(), t);
194       throw (NullPointerException) t;
195     }
196 
197     if (t instanceof UndeclaredThrowableException) {
198       t = t.getCause();
199     }
200     if (t instanceof RemoteException) {
201       t = ((RemoteException) t).unwrapRemoteException();
202     }
203     if (t instanceof DoNotRetryIOException) {
204       throw (DoNotRetryIOException) t;
205     }
206     if (t instanceof NeedUnmanagedConnectionException) {
207       throw new DoNotRetryIOException(t);
208     }
209     if (t instanceof Error) {
210       throw (Error) t;
211     }
212     return t;
213   }
214 
215   /**
216    * Check if the exception is something that indicates that we cannot
217    * contact/communicate with the server.
218    *
219    * @param e
220    * @return true when exception indicates that the client wasn't able to make contact with server
221    */
222   private boolean isConnectionException(Throwable e) {
223     if (e == null)
224       return false;
225     // This list covers most connectivity exceptions but not all.
226     // For example, in SocketOutputStream a plain IOException is thrown
227     // at times when the channel is closed.
228     return (e instanceof SocketTimeoutException
229         || e instanceof ConnectException || e instanceof ClosedChannelException
230         || e instanceof SyncFailedException || e instanceof EOFException
231         || e instanceof TimeoutException
232         || e instanceof ConnectionClosingException || e instanceof FailedServerException);
233   }
234 
235   /**
236    * Occasionally cleans up unused information in repeatedFailuresMap.
237    *
238    * repeatedFailuresMap stores the failure information for all remote hosts
239    * that had failures. In order to avoid these from growing indefinitely,
240    * occassionallyCleanupFailureInformation() will clear these up once every
241    * cleanupInterval ms.
242    */
243   protected void occasionallyCleanupFailureInformation() {
244     long now = System.currentTimeMillis();
245     if (!(now > lastFailureMapCleanupTimeMilliSec
246         + failureMapCleanupIntervalMilliSec))
247       return;
248 
249     // remove entries that haven't been attempted in a while
250     // No synchronization needed. It is okay if multiple threads try to
251     // remove the entry again and again from a concurrent hash map.
252     StringBuilder sb = new StringBuilder();
253     for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
254       if (now > entry.getValue().timeOfLatestAttemptMilliSec
255           + failureMapCleanupIntervalMilliSec) { // no recent failures
256         repeatedFailuresMap.remove(entry.getKey());
257       } else if (now > entry.getValue().timeOfFirstFailureMilliSec
258           + this.fastFailClearingTimeMilliSec) { // been failing for a long
259                                                  // time
260         LOG.error(entry.getKey()
261             + " been failing for a long time. clearing out."
262             + entry.getValue().toString());
263         repeatedFailuresMap.remove(entry.getKey());
264       } else {
265         sb.append(entry.getKey().toString()).append(" failing ")
266             .append(entry.getValue().toString()).append("\n");
267       }
268     }
269     if (sb.length() > 0) {
270       LOG.warn("Preemptive failure enabled for : " + sb.toString());
271     }
272     lastFailureMapCleanupTimeMilliSec = now;
273   }
274 
275   /**
276    * Checks to see if we are in the Fast fail mode for requests to the server.
277    *
278    * If a client is unable to contact a server for more than
279    * fastFailThresholdMilliSec the client will get into fast fail mode.
280    *
281    * @param server
282    * @return true if the client is in fast fail mode for the server.
283    */
284   private boolean inFastFailMode(ServerName server) {
285     FailureInfo fInfo = repeatedFailuresMap.get(server);
286     // if fInfo is null --> The server is considered good.
287     // If the server is bad, wait long enough to believe that the server is
288     // down.
289     return (fInfo != null &&
290         EnvironmentEdgeManager.currentTime() >
291           (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec));
292   }
293 
294   /**
295    * Checks to see if the current thread is already in FastFail mode for *some*
296    * server.
297    *
298    * @return true, if the thread is already in FF mode.
299    */
300   private boolean currentThreadInFastFailMode() {
301     return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode
302         .get().booleanValue() == true));
303   }
304 
305   /**
306    * Check to see if the client should try to connnect to the server, inspite of
307    * knowing that it is in the fast fail mode.
308    *
309    * The idea here is that we want just one client thread to be actively trying
310    * to reconnect, while all the other threads trying to reach the server will
311    * short circuit.
312    *
313    * @param fInfo
314    * @return true if the client should try to connect to the server.
315    */
316   protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
317     // We believe that the server is down, But, we want to have just one
318     // client
319     // actively trying to connect. If we are the chosen one, we will retry
320     // and not throw an exception.
321     if (fInfo != null
322         && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) {
323       MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
324           .get();
325       if (threadAlreadyInFF == null) {
326         threadAlreadyInFF = new MutableBoolean();
327         this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
328       }
329       threadAlreadyInFF.setValue(true);
330       return true;
331     } else {
332       return false;
333     }
334   }
335 
336   /**
337    *
338    * This function updates the Failure info for a particular server after the
339    * attempt to 
340    *
341    * @param server
342    * @param fInfo
343    * @param couldNotCommunicate
344    * @param retryDespiteFastFailMode
345    */
346   private void updateFailureInfoForServer(ServerName server,
347       FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
348       boolean retryDespiteFastFailMode) {
349     if (server == null || fInfo == null || didTry == false)
350       return;
351 
352     // If we were able to connect to the server, reset the failure
353     // information.
354     if (couldNotCommunicate == false) {
355       LOG.info("Clearing out PFFE for server " + server.getServerName());
356       repeatedFailuresMap.remove(server);
357     } else {
358       // update time of last attempt
359       long currentTime = System.currentTimeMillis();
360       fInfo.timeOfLatestAttemptMilliSec = currentTime;
361 
362       // Release the lock if we were retrying inspite of FastFail
363       if (retryDespiteFastFailMode) {
364         fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
365         threadRetryingInFastFailMode.get().setValue(false);
366       }
367     }
368 
369     occasionallyCleanupFailureInformation();
370   }
371 
372   @Override
373   public void intercept(RetryingCallerInterceptorContext context)
374       throws PreemptiveFastFailException {
375     if (context instanceof FastFailInterceptorContext) {
376       intercept((FastFailInterceptorContext) context);
377     }
378   }
379 
380   @Override
381   public void handleFailure(RetryingCallerInterceptorContext context,
382       Throwable t) throws IOException {
383     if (context instanceof FastFailInterceptorContext) {
384       handleFailure((FastFailInterceptorContext) context, t);
385     }
386   }
387 
388   @Override
389   public void updateFailureInfo(RetryingCallerInterceptorContext context) {
390     if (context instanceof FastFailInterceptorContext) {
391       updateFailureInfo((FastFailInterceptorContext) context);
392     }
393   }
394 
395   @Override
396   public RetryingCallerInterceptorContext createEmptyContext() {
397     return new FastFailInterceptorContext();
398   }
399 
400   protected boolean isServerInFailureMap(ServerName serverName) {
401     return this.repeatedFailuresMap.containsKey(serverName);
402   }
403 
404   @Override
405   public String toString() {
406     return "PreemptiveFastFailInterceptor";
407   }
408 }