1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.lang.reflect.UndeclaredThrowableException;
25 import java.net.SocketTimeoutException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.DoNotRetryIOException;
34 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
35 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36 import org.apache.hadoop.hbase.util.ExceptionUtil;
37 import org.apache.hadoop.ipc.RemoteException;
38
39 import com.google.protobuf.ServiceException;
40
41
42
43
44
45
46
47
48
49
50 @InterfaceAudience.Private
51 public class RpcRetryingCaller<T> {
52 public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
53
54
55
56 private long globalStartTime;
57
58
59
60 private final static int MIN_RPC_TIMEOUT = 2000;
61
62 private final int startLogErrorsCnt;
63
64 private final long pause;
65 private final int retries;
66 private final int rpcTimeout;
67 private final AtomicBoolean cancelled = new AtomicBoolean(false);
68 private final RetryingCallerInterceptor interceptor;
69 private final RetryingCallerInterceptorContext context;
70
71 public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) {
72 this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
73 }
74
75 public RpcRetryingCaller(long pause, int retries,
76 RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
77 this.pause = pause;
78 this.retries = retries;
79 this.interceptor = interceptor;
80 context = interceptor.createEmptyContext();
81 this.startLogErrorsCnt = startLogErrorsCnt;
82 this.rpcTimeout = rpcTimeout;
83 }
84
85 private int getRemainingTime(int callTimeout) {
86 if (callTimeout <= 0) {
87 return 0;
88 } else {
89 if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
90 int remainingTime = (int) (callTimeout -
91 (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
92 if (remainingTime < MIN_RPC_TIMEOUT) {
93
94
95
96 remainingTime = MIN_RPC_TIMEOUT;
97 }
98 return remainingTime;
99 }
100 }
101
102 private int getTimeout(int callTimeout){
103 int timeout = getRemainingTime(callTimeout);
104 if (timeout <= 0 || rpcTimeout > 0 && rpcTimeout < timeout){
105 timeout = rpcTimeout;
106 }
107 return timeout;
108 }
109
110 public void cancel(){
111 synchronized (cancelled){
112 cancelled.set(true);
113 cancelled.notifyAll();
114 }
115 }
116
117
118
119
120
121
122
123
124
125 public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
126 throws IOException, RuntimeException {
127 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
128 new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
129 this.globalStartTime = EnvironmentEdgeManager.currentTime();
130 context.clear();
131 for (int tries = 0;; tries++) {
132 long expectedSleep;
133 try {
134 callable.prepare(tries != 0);
135 interceptor.intercept(context.prepare(callable, tries));
136 return callable.call(getTimeout(callTimeout));
137 } catch (PreemptiveFastFailException e) {
138 throw e;
139 } catch (Throwable t) {
140 ExceptionUtil.rethrowIfInterrupt(t);
141 if (tries > startLogErrorsCnt) {
142 LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
143 (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
144 + "cancelled=" + cancelled.get() + ", msg="
145 + callable.getExceptionMessageAdditionalDetail());
146 }
147
148
149 interceptor.handleFailure(context, t);
150 t = translateException(t);
151 callable.throwable(t, retries != 1);
152 RetriesExhaustedException.ThrowableWithExtraContext qt =
153 new RetriesExhaustedException.ThrowableWithExtraContext(t,
154 EnvironmentEdgeManager.currentTime(), toString());
155 exceptions.add(qt);
156 if (tries >= retries - 1) {
157 throw new RetriesExhaustedException(tries, exceptions);
158 }
159
160
161
162 expectedSleep = callable.sleep(pause, tries);
163
164
165 long duration = singleCallDuration(expectedSleep);
166 if (duration > callTimeout) {
167 String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
168 ": " + callable.getExceptionMessageAdditionalDetail();
169 throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
170 }
171 } finally {
172 interceptor.updateFailureInfo(context);
173 }
174 try {
175 if (expectedSleep > 0) {
176 synchronized (cancelled) {
177 if (cancelled.get()) return null;
178 cancelled.wait(expectedSleep);
179 }
180 }
181 if (cancelled.get()) return null;
182 } catch (InterruptedException e) {
183 throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries);
184 }
185 }
186 }
187
188
189
190
191 private long singleCallDuration(final long expectedSleep) {
192 return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep;
193 }
194
195
196
197
198
199
200
201
202
203
204 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
205 throws IOException, RuntimeException {
206
207 this.globalStartTime = EnvironmentEdgeManager.currentTime();
208 try {
209 callable.prepare(false);
210 return callable.call(callTimeout);
211 } catch (Throwable t) {
212 Throwable t2 = translateException(t);
213 ExceptionUtil.rethrowIfInterrupt(t2);
214
215 if (t2 instanceof IOException) {
216 throw (IOException)t2;
217 } else {
218 throw new RuntimeException(t2);
219 }
220 }
221 }
222
223
224
225
226
227
228
229 static Throwable translateException(Throwable t) throws DoNotRetryIOException {
230 if (t instanceof UndeclaredThrowableException) {
231 if (t.getCause() != null) {
232 t = t.getCause();
233 }
234 }
235 if (t instanceof RemoteException) {
236 t = ((RemoteException)t).unwrapRemoteException();
237 }
238 if (t instanceof LinkageError) {
239 throw new DoNotRetryIOException(t);
240 }
241 if (t instanceof ServiceException) {
242 ServiceException se = (ServiceException)t;
243 Throwable cause = se.getCause();
244 if (cause != null) {
245 if (cause instanceof DoNotRetryIOException) {
246 throw (DoNotRetryIOException)cause;
247 } else if (cause instanceof NeedUnmanagedConnectionException) {
248 throw new DoNotRetryIOException(cause);
249 }
250 }
251
252 t = cause;
253
254 translateException(t);
255 } else if (t instanceof DoNotRetryIOException) {
256 throw (DoNotRetryIOException)t;
257 } else if (t instanceof NeedUnmanagedConnectionException) {
258 throw new DoNotRetryIOException(t);
259 }
260 return t;
261 }
262
263 @Override
264 public String toString() {
265 return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
266 ", pause=" + pause + ", retries=" + retries + '}';
267 }
268 }