1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import com.google.protobuf.ServiceException;
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.HRegionLocation;
30 import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
31 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
32 import org.apache.hadoop.hbase.ipc.RpcClient;
33 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36 import org.apache.hadoop.ipc.RemoteException;
37
38 import java.io.IOException;
39 import java.lang.reflect.UndeclaredThrowableException;
40 import java.net.ConnectException;
41 import java.net.SocketTimeoutException;
42 import java.util.ArrayList;
43 import java.util.List;
44 import java.util.concurrent.Callable;
45
46
47
48
49
50
51
52
53
54
55
56
57 @InterfaceAudience.Public
58 @InterfaceStability.Stable
59 public abstract class ServerCallable<T> implements Callable<T> {
60 static final Log LOG = LogFactory.getLog(ServerCallable.class);
61
62 protected final HConnection connection;
63 protected final byte [] tableName;
64 protected final byte [] row;
65 protected HRegionLocation location;
66 protected ClientService.BlockingInterface stub;
67 protected int callTimeout;
68 protected long globalStartTime;
69 protected long startTime, endTime;
70 protected final static int MIN_RPC_TIMEOUT = 2000;
71 protected final static int MIN_WAIT_DEAD_SERVER = 10000;
72
73
74
75
76
77
78 public ServerCallable(HConnection connection, byte [] tableName, byte [] row) {
79 this(connection, tableName, row, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
80 }
81
82 public ServerCallable(HConnection connection, byte [] tableName, byte [] row, int callTimeout) {
83 this.connection = connection;
84 this.tableName = tableName;
85 this.row = row;
86 this.callTimeout = callTimeout;
87 }
88
89
90
91
92
93
94
95 public void prepare(final boolean reload) throws IOException {
96 this.location = connection.getRegionLocation(tableName, row, reload);
97 this.stub = connection.getClient(location.getServerName());
98 }
99
100
101
102
103 public String getServerName() {
104 if (location == null) return null;
105 return location.getHostnamePort();
106 }
107
108
109
110
111 public byte[] getRegionName() {
112 if (location == null) return null;
113 return location.getRegionInfo().getRegionName();
114 }
115
116
117
118
119 public byte [] getRow() {
120 return row;
121 }
122
123 public void beforeCall() {
124 this.startTime = EnvironmentEdgeManager.currentTimeMillis();
125 int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime));
126 if (remaining < MIN_RPC_TIMEOUT) {
127
128
129
130 remaining = MIN_RPC_TIMEOUT;
131 }
132 RpcClient.setRpcTimeout(remaining);
133 }
134
135 public void afterCall() {
136 RpcClient.resetRpcTimeout();
137 this.endTime = EnvironmentEdgeManager.currentTimeMillis();
138 }
139
140
141
142
143 HConnection getConnection() {
144 return this.connection;
145 }
146
147
148
149
150
151
152
153
154
155 public T withRetries()
156 throws IOException, RuntimeException {
157 Configuration c = getConnection().getConfiguration();
158 final long pause = c.getLong(HConstants.HBASE_CLIENT_PAUSE,
159 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
160 final int numRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
161 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
162 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
163 new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
164 this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
165 for (int tries = 0;; tries++) {
166 long expectedSleep = 0;
167 try {
168 beforeCall();
169 prepare(tries != 0);
170 return call();
171 } catch (Throwable t) {
172 LOG.warn("Call exception, tries=" + tries + ", numRetries=" + numRetries + ": " + t);
173
174 t = translateException(t);
175
176
177
178 if (t instanceof SocketTimeoutException ||
179 t instanceof ConnectException ||
180 t instanceof RetriesExhaustedException ||
181 (location != null && getConnection().isDeadServer(location.getServerName()))) {
182
183
184
185 getConnection().clearCaches(location.getServerName());
186 } else if (t instanceof NotServingRegionException && numRetries == 1) {
187
188
189 getConnection().deleteCachedRegionLocation(location);
190 }
191
192 RetriesExhaustedException.ThrowableWithExtraContext qt =
193 new RetriesExhaustedException.ThrowableWithExtraContext(t,
194 EnvironmentEdgeManager.currentTimeMillis(), toString());
195 exceptions.add(qt);
196 if (tries >= numRetries - 1) {
197 throw new RetriesExhaustedException(tries, exceptions);
198 }
199
200
201
202 expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
203 if (expectedSleep < MIN_WAIT_DEAD_SERVER
204 && (location == null || getConnection().isDeadServer(location.getServerName()))) {
205 expectedSleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
206 }
207
208
209 if (((this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep) >
210 this.callTimeout) {
211 throw (SocketTimeoutException) new SocketTimeoutException(
212 "Call to access row '" + Bytes.toString(row) + "' on table '"
213 + Bytes.toString(tableName)
214 + "' failed on timeout. " + " callTimeout=" + this.callTimeout +
215 ", time=" + (this.endTime - this.startTime)).initCause(t);
216 }
217 } finally {
218 afterCall();
219 }
220 try {
221 Thread.sleep(expectedSleep);
222 } catch (InterruptedException e) {
223 Thread.currentThread().interrupt();
224 throw new IOException("Interrupted after " + tries + " tries on " + numRetries, e);
225 }
226 }
227 }
228
229
230
231
232
233
234
235 public T withoutRetries()
236 throws IOException, RuntimeException {
237
238 this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
239 try {
240 beforeCall();
241 prepare(false);
242 return call();
243 } catch (Throwable t) {
244 Throwable t2 = translateException(t);
245
246 if (t2 instanceof IOException) {
247 throw (IOException)t2;
248 } else {
249 throw new RuntimeException(t2);
250 }
251 } finally {
252 afterCall();
253 }
254 }
255
256
257
258
259
260
261
262 protected static Throwable translateException(Throwable t) throws DoNotRetryIOException {
263 if (t instanceof UndeclaredThrowableException) {
264 if(t.getCause() != null) {
265 t = t.getCause();
266 }
267 }
268 if (t instanceof RemoteException) {
269 t = ((RemoteException)t).unwrapRemoteException();
270 }
271 if (t instanceof ServiceException) {
272 ServiceException se = (ServiceException)t;
273 Throwable cause = se.getCause();
274 if (cause != null && cause instanceof DoNotRetryIOException) {
275 throw (DoNotRetryIOException)cause;
276 }
277 } else if (t instanceof DoNotRetryIOException) {
278 throw (DoNotRetryIOException)t;
279 }
280 return t;
281 }
282 }