1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.EOFException;
21 import java.io.IOException;
22 import java.io.SyncFailedException;
23 import java.lang.reflect.UndeclaredThrowableException;
24 import java.net.ConnectException;
25 import java.net.SocketTimeoutException;
26 import java.nio.channels.ClosedChannelException;
27 import java.util.Map.Entry;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.TimeoutException;
31
32 import org.apache.commons.lang.mutable.MutableBoolean;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.DoNotRetryIOException;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.ServerName;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
41 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
42 import org.apache.hadoop.hbase.ipc.FailedServerException;
43 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
44 import org.apache.hadoop.ipc.RemoteException;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 @InterfaceAudience.Private
74 class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
75
76 public static final Log LOG = LogFactory
77 .getLog(PreemptiveFastFailInterceptor.class);
78
79
80
81 protected final long fastFailThresholdMilliSec;
82
83
84
85 protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap =
86 new ConcurrentHashMap<ServerName, FailureInfo>();
87
88
89
90
91 protected final long failureMapCleanupIntervalMilliSec;
92
93 protected volatile long lastFailureMapCleanupTimeMilliSec;
94
95
96
97
98 private long fastFailClearingTimeMilliSec;
99
100 private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode =
101 new ThreadLocal<MutableBoolean>();
102
103 public PreemptiveFastFailInterceptor(Configuration conf) {
104 this.fastFailThresholdMilliSec = conf.getLong(
105 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
106 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT);
107 this.failureMapCleanupIntervalMilliSec = conf.getLong(
108 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
109 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT);
110 lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime();
111 }
112
113 public void intercept(FastFailInterceptorContext context)
114 throws PreemptiveFastFailException {
115 context.setFailureInfo(repeatedFailuresMap.get(context.getServer()));
116 if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) {
117
118
119 context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context
120 .getFailureInfo()));
121 if (!context.isRetryDespiteFastFailMode()) {
122 LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : "
123 + context.getTries());
124 throw new PreemptiveFastFailException(
125 context.getFailureInfo().numConsecutiveFailures.get(),
126 context.getFailureInfo().timeOfFirstFailureMilliSec,
127 context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer());
128 }
129 }
130 context.setDidTry(true);
131 }
132
133 public void handleFailure(FastFailInterceptorContext context,
134 Throwable t) throws IOException {
135 handleThrowable(t, context.getServer(),
136 context.getCouldNotCommunicateWithServer());
137 }
138
139 public void updateFailureInfo(FastFailInterceptorContext context) {
140 updateFailureInfoForServer(context.getServer(), context.getFailureInfo(),
141 context.didTry(), context.getCouldNotCommunicateWithServer()
142 .booleanValue(), context.isRetryDespiteFastFailMode());
143 }
144
145
146
147
148
149
150
151
152
153
154
155
156 private void handleFailureToServer(ServerName serverName, Throwable t) {
157 if (serverName == null || t == null) {
158 return;
159 }
160 long currentTime = EnvironmentEdgeManager.currentTime();
161 FailureInfo fInfo = repeatedFailuresMap.get(serverName);
162 if (fInfo == null) {
163 fInfo = new FailureInfo(currentTime);
164 FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo);
165
166 if (oldfInfo != null) {
167 fInfo = oldfInfo;
168 }
169 }
170 fInfo.timeOfLatestAttemptMilliSec = currentTime;
171 fInfo.numConsecutiveFailures.incrementAndGet();
172 }
173
174 public void handleThrowable(Throwable t1, ServerName serverName,
175 MutableBoolean couldNotCommunicateWithServer) throws IOException {
176 Throwable t2 = translateException(t1);
177 boolean isLocalException = !(t2 instanceof RemoteException);
178 if (isLocalException && isConnectionException(t2)) {
179 couldNotCommunicateWithServer.setValue(true);
180 handleFailureToServer(serverName, t2);
181 }
182 }
183
184 private Throwable translateException(Throwable t) throws IOException {
185 if (t instanceof NoSuchMethodError) {
186
187 LOG.error(t);
188 throw (NoSuchMethodError) t;
189 }
190
191 if (t instanceof NullPointerException) {
192
193 LOG.error(t.getMessage(), t);
194 throw (NullPointerException) t;
195 }
196
197 if (t instanceof UndeclaredThrowableException) {
198 t = t.getCause();
199 }
200 if (t instanceof RemoteException) {
201 t = ((RemoteException) t).unwrapRemoteException();
202 }
203 if (t instanceof DoNotRetryIOException) {
204 throw (DoNotRetryIOException) t;
205 }
206 if (t instanceof Error) {
207 throw (Error) t;
208 }
209 return t;
210 }
211
212
213
214
215
216
217
218
219 private boolean isConnectionException(Throwable e) {
220 if (e == null)
221 return false;
222
223
224
225 return (e instanceof SocketTimeoutException
226 || e instanceof ConnectException || e instanceof ClosedChannelException
227 || e instanceof SyncFailedException || e instanceof EOFException
228 || e instanceof TimeoutException
229 || e instanceof ConnectionClosingException || e instanceof FailedServerException);
230 }
231
232
233
234
235
236
237
238
239
240 protected void occasionallyCleanupFailureInformation() {
241 long now = System.currentTimeMillis();
242 if (!(now > lastFailureMapCleanupTimeMilliSec
243 + failureMapCleanupIntervalMilliSec))
244 return;
245
246
247
248
249 StringBuilder sb = new StringBuilder();
250 for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
251 if (now > entry.getValue().timeOfLatestAttemptMilliSec
252 + failureMapCleanupIntervalMilliSec) {
253 repeatedFailuresMap.remove(entry.getKey());
254 } else if (now > entry.getValue().timeOfFirstFailureMilliSec
255 + this.fastFailClearingTimeMilliSec) {
256
257 LOG.error(entry.getKey()
258 + " been failing for a long time. clearing out."
259 + entry.getValue().toString());
260 repeatedFailuresMap.remove(entry.getKey());
261 } else {
262 sb.append(entry.getKey().toString()).append(" failing ")
263 .append(entry.getValue().toString()).append("\n");
264 }
265 }
266 if (sb.length() > 0) {
267 LOG.warn("Preemptive failure enabled for : " + sb.toString());
268 }
269 lastFailureMapCleanupTimeMilliSec = now;
270 }
271
272
273
274
275
276
277
278
279
280
281 private boolean inFastFailMode(ServerName server) {
282 FailureInfo fInfo = repeatedFailuresMap.get(server);
283
284
285
286 return (fInfo != null &&
287 EnvironmentEdgeManager.currentTime() >
288 (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec));
289 }
290
291
292
293
294
295
296
297 private boolean currentThreadInFastFailMode() {
298 return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode
299 .get().booleanValue() == true));
300 }
301
302
303
304
305
306
307
308
309
310
311
312
313 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
314
315
316
317
318 if (fInfo != null
319 && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) {
320 MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
321 .get();
322 if (threadAlreadyInFF == null) {
323 threadAlreadyInFF = new MutableBoolean();
324 this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
325 }
326 threadAlreadyInFF.setValue(true);
327 return true;
328 } else {
329 return false;
330 }
331 }
332
333
334
335
336
337
338
339
340
341
342
343 private void updateFailureInfoForServer(ServerName server,
344 FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
345 boolean retryDespiteFastFailMode) {
346 if (server == null || fInfo == null || didTry == false)
347 return;
348
349
350
351 if (couldNotCommunicate == false) {
352 LOG.info("Clearing out PFFE for server " + server.getServerName());
353 repeatedFailuresMap.remove(server);
354 } else {
355
356 long currentTime = System.currentTimeMillis();
357 fInfo.timeOfLatestAttemptMilliSec = currentTime;
358
359
360 if (retryDespiteFastFailMode) {
361 fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
362 threadRetryingInFastFailMode.get().setValue(false);
363 }
364 }
365
366 occasionallyCleanupFailureInformation();
367 }
368
369 @Override
370 public void intercept(RetryingCallerInterceptorContext context)
371 throws PreemptiveFastFailException {
372 if (context instanceof FastFailInterceptorContext) {
373 intercept((FastFailInterceptorContext) context);
374 }
375 }
376
377 @Override
378 public void handleFailure(RetryingCallerInterceptorContext context,
379 Throwable t) throws IOException {
380 if (context instanceof FastFailInterceptorContext) {
381 handleFailure((FastFailInterceptorContext) context, t);
382 }
383 }
384
385 @Override
386 public void updateFailureInfo(RetryingCallerInterceptorContext context) {
387 if (context instanceof FastFailInterceptorContext) {
388 updateFailureInfo((FastFailInterceptorContext) context);
389 }
390 }
391
392 @Override
393 public RetryingCallerInterceptorContext createEmptyContext() {
394 return new FastFailInterceptorContext();
395 }
396
397 protected boolean isServerInFailureMap(ServerName serverName) {
398 return this.repeatedFailuresMap.containsKey(serverName);
399 }
400
401 @Override
402 public String toString() {
403 return "PreemptiveFastFailInterceptor";
404 }
405 }