1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.client;
22
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.DoNotRetryIOException;
28 import org.apache.hadoop.hbase.HBaseIOException;
29 import org.apache.hadoop.hbase.HRegionLocation;
30 import org.apache.hadoop.hbase.RegionLocations;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
35 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.protobuf.RequestConverter;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40
41 import com.google.protobuf.ServiceException;
42
43
44 import java.io.IOException;
45 import java.io.InterruptedIOException;
46 import java.util.Collections;
47 import java.util.List;
48 import java.util.concurrent.CancellationException;
49 import java.util.concurrent.ExecutionException;
50 import java.util.concurrent.Executor;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Future;
53 import java.util.concurrent.RunnableFuture;
54 import java.util.concurrent.TimeUnit;
55 import java.util.concurrent.TimeoutException;
56
57
58
59
60
61
62
63 @InterfaceAudience.Private
64 public class RpcRetryingCallerWithReadReplicas {
65 private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
66
67 protected final ExecutorService pool;
68 protected final ClusterConnection cConnection;
69 protected final Configuration conf;
70 protected final Get get;
71 protected final TableName tableName;
72 protected final int timeBeforeReplicas;
73 private final int callTimeout;
74 private final int retries;
75 private final RpcControllerFactory rpcControllerFactory;
76 private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
77
78 public RpcRetryingCallerWithReadReplicas(
79 RpcControllerFactory rpcControllerFactory, TableName tableName,
80 ClusterConnection cConnection, final Get get,
81 ExecutorService pool, int retries, int callTimeout,
82 int timeBeforeReplicas) {
83 this.rpcControllerFactory = rpcControllerFactory;
84 this.tableName = tableName;
85 this.cConnection = cConnection;
86 this.conf = cConnection.getConfiguration();
87 this.get = get;
88 this.pool = pool;
89 this.retries = retries;
90 this.callTimeout = callTimeout;
91 this.timeBeforeReplicas = timeBeforeReplicas;
92 this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
93 }
94
95
96
97
98
99
100
101 class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable {
102 final int id;
103 private final PayloadCarryingRpcController controller;
104
105 public ReplicaRegionServerCallable(int id, HRegionLocation location) {
106 super(RpcRetryingCallerWithReadReplicas.this.cConnection,
107 RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
108 this.id = id;
109 this.location = location;
110 this.controller = rpcControllerFactory.newController();
111 controller.setPriority(tableName);
112 }
113
114 @Override
115 public void cancel() {
116 controller.startCancel();
117 }
118
119
120
121
122
123
124 @Override
125 public void prepare(final boolean reload) throws IOException {
126 if (controller.isCanceled()) return;
127
128 if (Thread.interrupted()) {
129 throw new InterruptedIOException();
130 }
131
132 if (reload || location == null) {
133 RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow());
134 location = id < rl.size() ? rl.getRegionLocation(id) : null;
135 }
136
137 if (location == null || location.getServerName() == null) {
138
139
140 throw new HBaseIOException("There is no location for replica id #" + id);
141 }
142
143 ServerName dest = location.getServerName();
144
145 setStub(cConnection.getClient(dest));
146 }
147
148 @Override
149 public Result call(int callTimeout) throws Exception {
150 if (controller.isCanceled()) return null;
151
152 if (Thread.interrupted()) {
153 throw new InterruptedIOException();
154 }
155
156 byte[] reg = location.getRegionInfo().getRegionName();
157
158 ClientProtos.GetRequest request =
159 RequestConverter.buildGetRequest(reg, get);
160 controller.setCallTimeout(callTimeout);
161
162 try {
163 ClientProtos.GetResponse response = getStub().get(controller, request);
164 if (response == null) {
165 return null;
166 }
167 return ProtobufUtil.toResult(response.getResult());
168 } catch (ServiceException se) {
169 throw ProtobufUtil.getRemoteException(se);
170 }
171 }
172
173 @Override
174 public boolean isCancelled() {
175 return controller.isCanceled();
176 }
177 }
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197 public synchronized Result call()
198 throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
199 boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
200
201 RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
202 : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
203 ResultBoundedCompletionService<Result> cs =
204 new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, rl.size());
205
206 if(isTargetReplicaSpecified) {
207 addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
208 } else {
209 addCallsForReplica(cs, rl, 0, 0);
210 try {
211
212 Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS);
213 if (f != null) {
214 return f.get();
215 }
216 } catch (ExecutionException e) {
217 throwEnrichedException(e, retries);
218 } catch (CancellationException e) {
219 throw new InterruptedIOException();
220 } catch (InterruptedException e) {
221 throw new InterruptedIOException();
222 }
223
224
225 addCallsForReplica(cs, rl, 1, rl.size() - 1);
226 }
227
228 try {
229 try {
230 Future<Result> f = cs.take();
231 return f.get();
232 } catch (ExecutionException e) {
233 throwEnrichedException(e, retries);
234 }
235 } catch (CancellationException e) {
236 throw new InterruptedIOException();
237 } catch (InterruptedException e) {
238 throw new InterruptedIOException();
239 } finally {
240
241
242 cs.cancelAll();
243 }
244
245 return null;
246 }
247
248
249
250
251
252 static void throwEnrichedException(ExecutionException e, int retries)
253 throws RetriesExhaustedException, DoNotRetryIOException {
254 Throwable t = e.getCause();
255 assert t != null;
256
257 if (t instanceof RetriesExhaustedException) {
258 throw (RetriesExhaustedException) t;
259 }
260
261 if (t instanceof DoNotRetryIOException) {
262 throw (DoNotRetryIOException) t;
263 }
264
265 if (t instanceof NeedUnmanagedConnectionException) {
266 throw new DoNotRetryIOException(t);
267 }
268
269 RetriesExhaustedException.ThrowableWithExtraContext qt =
270 new RetriesExhaustedException.ThrowableWithExtraContext(t,
271 EnvironmentEdgeManager.currentTime(), null);
272
273 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
274 Collections.singletonList(qt);
275
276 throw new RetriesExhaustedException(retries, exceptions);
277 }
278
279
280
281
282
283
284
285
286
287 private void addCallsForReplica(ResultBoundedCompletionService<Result> cs,
288 RegionLocations rl, int min, int max) {
289 for (int id = min; id <= max; id++) {
290 HRegionLocation hrl = rl.getRegionLocation(id);
291 ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
292 cs.submit(callOnReplica, callTimeout, id);
293 }
294 }
295
296 static RegionLocations getRegionLocations(boolean useCache, int replicaId,
297 ClusterConnection cConnection, TableName tableName, byte[] row)
298 throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
299
300 RegionLocations rl;
301 try {
302 if (useCache) {
303 rl = cConnection.locateRegion(tableName, row, true, true, replicaId);
304 } else {
305 rl = cConnection.relocateRegion(tableName, row, replicaId);
306 }
307 } catch (DoNotRetryIOException e) {
308 throw e;
309 } catch (NeedUnmanagedConnectionException e) {
310 throw new DoNotRetryIOException(e);
311 } catch (RetriesExhaustedException e) {
312 throw e;
313 } catch (InterruptedIOException e) {
314 throw e;
315 } catch (IOException e) {
316 throw new RetriesExhaustedException("Can't get the location", e);
317 }
318 if (rl == null) {
319 throw new RetriesExhaustedException("Can't get the locations");
320 }
321
322 return rl;
323 }
324 }