1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  package org.apache.hadoop.hbase.client;
22  
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.DoNotRetryIOException;
28  import org.apache.hadoop.hbase.HBaseIOException;
29  import org.apache.hadoop.hbase.HRegionLocation;
30  import org.apache.hadoop.hbase.RegionLocations;
31  import org.apache.hadoop.hbase.ServerName;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
35  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.RequestConverter;
38  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40  
41  import com.google.protobuf.ServiceException;
42  
43  
44  import java.io.IOException;
45  import java.io.InterruptedIOException;
46  import java.util.Collections;
47  import java.util.List;
48  import java.util.concurrent.CancellationException;
49  import java.util.concurrent.ExecutionException;
50  import java.util.concurrent.Executor;
51  import java.util.concurrent.ExecutorService;
52  import java.util.concurrent.Future;
53  import java.util.concurrent.RunnableFuture;
54  import java.util.concurrent.TimeUnit;
55  import java.util.concurrent.TimeoutException;
56  
57  
58  
59  
60  
61  
62  
63  @InterfaceAudience.Private
64  public class RpcRetryingCallerWithReadReplicas {
65    private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
66  
67    protected final ExecutorService pool;
68    protected final ClusterConnection cConnection;
69    protected final Configuration conf;
70    protected final Get get;
71    protected final TableName tableName;
72    protected final int timeBeforeReplicas;
73    private final int callTimeout;
74    private final int retries;
75    private final RpcControllerFactory rpcControllerFactory;
76    private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
77  
78    public RpcRetryingCallerWithReadReplicas(
79        RpcControllerFactory rpcControllerFactory, TableName tableName,
80        ClusterConnection cConnection, final Get get,
81        ExecutorService pool, int retries, int callTimeout,
82        int timeBeforeReplicas) {
83      this.rpcControllerFactory = rpcControllerFactory;
84      this.tableName = tableName;
85      this.cConnection = cConnection;
86      this.conf = cConnection.getConfiguration();
87      this.get = get;
88      this.pool = pool;
89      this.retries = retries;
90      this.callTimeout = callTimeout;
91      this.timeBeforeReplicas = timeBeforeReplicas;
92      this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
93    }
94  
95    
96  
97  
98  
99  
100 
101   class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable {
102     final int id;
103     private final PayloadCarryingRpcController controller;
104 
105     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
106       super(RpcRetryingCallerWithReadReplicas.this.cConnection,
107           RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
108       this.id = id;
109       this.location = location;
110       this.controller = rpcControllerFactory.newController();
111       controller.setPriority(tableName);
112     }
113 
114     @Override
115     public void cancel() {
116       controller.startCancel();
117     }
118 
119     
120 
121 
122 
123 
124     @Override
125     public void prepare(final boolean reload) throws IOException {
126       if (controller.isCanceled()) return;
127 
128       if (Thread.interrupted()) {
129         throw new InterruptedIOException();
130       }
131 
132       if (reload || location == null) {
133         RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow());
134         location = id < rl.size() ? rl.getRegionLocation(id) : null;
135       }
136 
137       if (location == null || location.getServerName() == null) {
138         
139         
140         throw new HBaseIOException("There is no location for replica id #" + id);
141       }
142 
143       ServerName dest = location.getServerName();
144 
145       setStub(cConnection.getClient(dest));
146     }
147 
148     @Override
149     public Result call(int callTimeout) throws Exception {
150       if (controller.isCanceled()) return null;
151 
152       if (Thread.interrupted()) {
153         throw new InterruptedIOException();
154       }
155 
156       byte[] reg = location.getRegionInfo().getRegionName();
157 
158       ClientProtos.GetRequest request =
159           RequestConverter.buildGetRequest(reg, get);
160       controller.setCallTimeout(callTimeout);
161 
162       try {
163         ClientProtos.GetResponse response = getStub().get(controller, request);
164         if (response == null) {
165           return null;
166         }
167         return ProtobufUtil.toResult(response.getResult());
168       } catch (ServiceException se) {
169         throw ProtobufUtil.getRemoteException(se);
170       }
171     }
172 
173     @Override
174     public boolean isCancelled() {
175       return controller.isCanceled();
176     }
177   }
178 
179   
180 
181 
182 
183 
184 
185 
186 
187 
188 
189 
190 
191 
192 
193 
194 
195 
196 
197   public synchronized Result call()
198       throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
199     boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
200 
201     RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
202         : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
203     ResultBoundedCompletionService<Result> cs =
204         new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, rl.size());
205 
206     if(isTargetReplicaSpecified) {
207       addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
208     } else {
209       addCallsForReplica(cs, rl, 0, 0);
210       try {
211         
212         Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); 
213         if (f != null) {
214           return f.get(); 
215         }
216       } catch (ExecutionException e) {
217         throwEnrichedException(e, retries);
218       } catch (CancellationException e) {
219         throw new InterruptedIOException();
220       } catch (InterruptedException e) {
221         throw new InterruptedIOException();
222       }
223 
224       
225       addCallsForReplica(cs, rl, 1, rl.size() - 1);
226     }
227 
228     try {
229       try {
230         Future<Result> f = cs.take();
231         return f.get();
232       } catch (ExecutionException e) {
233         throwEnrichedException(e, retries);
234       }
235     } catch (CancellationException e) {
236       throw new InterruptedIOException();
237     } catch (InterruptedException e) {
238       throw new InterruptedIOException();
239     } finally {
240       
241       
242       cs.cancelAll();
243     }
244 
245     return null; 
246   }
247 
248   
249 
250 
251 
252   static void throwEnrichedException(ExecutionException e, int retries)
253       throws RetriesExhaustedException, DoNotRetryIOException {
254     Throwable t = e.getCause();
255     assert t != null; 
256 
257     if (t instanceof RetriesExhaustedException) {
258       throw (RetriesExhaustedException) t;
259     }
260 
261     if (t instanceof DoNotRetryIOException) {
262       throw (DoNotRetryIOException) t;
263     }
264 
265     if (t instanceof NeedUnmanagedConnectionException) {
266       throw new DoNotRetryIOException(t);
267     }
268 
269     RetriesExhaustedException.ThrowableWithExtraContext qt =
270         new RetriesExhaustedException.ThrowableWithExtraContext(t,
271             EnvironmentEdgeManager.currentTime(), null);
272 
273     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
274         Collections.singletonList(qt);
275 
276     throw new RetriesExhaustedException(retries, exceptions);
277   }
278 
279   
280 
281 
282 
283 
284 
285 
286 
287   private void addCallsForReplica(ResultBoundedCompletionService<Result> cs,
288                                  RegionLocations rl, int min, int max) {
289     for (int id = min; id <= max; id++) {
290       HRegionLocation hrl = rl.getRegionLocation(id);
291       ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
292       cs.submit(callOnReplica, callTimeout, id);
293     }
294   }
295 
296   static RegionLocations getRegionLocations(boolean useCache, int replicaId,
297                  ClusterConnection cConnection, TableName tableName, byte[] row)
298       throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
299 
300     RegionLocations rl;
301     try {
302       if (useCache) {
303         rl = cConnection.locateRegion(tableName, row, true, true, replicaId);
304       } else {
305         rl = cConnection.relocateRegion(tableName, row, replicaId);
306       }
307     } catch (DoNotRetryIOException e) {
308       throw e;
309     } catch (NeedUnmanagedConnectionException e) {
310       throw new DoNotRetryIOException(e);
311     } catch (RetriesExhaustedException e) {
312       throw e;
313     } catch (InterruptedIOException e) {
314       throw e;
315     } catch (IOException e) {
316       throw new RetriesExhaustedException("Can't get the location", e);
317     }
318     if (rl == null) {
319       throw new RetriesExhaustedException("Can't get the locations");
320     }
321 
322     return rl;
323   }
324 }