View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
22  
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.util.ArrayList;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.CancellationException;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.RegionLocations;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.classification.InterfaceAudience;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.Pair;
46  
47  import com.google.common.annotations.VisibleForTesting;
48  
49  /**
50   * This class has the logic for handling scanners for regions with and without replicas.
51   * 1. A scan is attempted on the default (primary) region
52   * 2. The scanner sends all the RPCs to the default region until it is done, or, there
53   * is a timeout on the default (a timeout of zero is disallowed).
54   * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
55   * 4. The results from the first successful scanner are taken, and it is stored which server
56   * returned the results.
57   * 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
58   * in which case, the other replicas are queried (as in (3) above).
59   *
60   */
61  @InterfaceAudience.Private
62  class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
63    private final Log LOG = LogFactory.getLog(this.getClass());
64    volatile ScannerCallable currentScannerCallable;
65    AtomicBoolean replicaSwitched = new AtomicBoolean(false);
66    final ClusterConnection cConnection;
67    protected final ExecutorService pool;
68    protected final int timeBeforeReplicas;
69    private final Scan scan;
70    private final int retries;
71    private Result lastResult;
72    private final RpcRetryingCaller<Result[]> caller;
73    private final TableName tableName;
74    private Configuration conf;
75    private int scannerTimeout;
76    private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
77    private boolean someRPCcancelled = false; //required for testing purposes only
78  
79    public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
80        ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
81        int retries, int scannerTimeout, int caching, Configuration conf,
82        RpcRetryingCaller<Result []> caller) {
83      this.currentScannerCallable = baseCallable;
84      this.cConnection = cConnection;
85      this.pool = pool;
86      if (timeBeforeReplicas < 0) {
87        throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
88      }
89      this.timeBeforeReplicas = timeBeforeReplicas;
90      this.scan = scan;
91      this.retries = retries;
92      this.tableName = tableName;
93      this.conf = conf;
94      this.scannerTimeout = scannerTimeout;
95      this.caller = caller;
96    }
97  
98    public void setClose() {
99      currentScannerCallable.setClose();
100   }
101 
102   public void setCaching(int caching) {
103     currentScannerCallable.setCaching(caching);
104   }
105 
106   public int getCaching() {
107     return currentScannerCallable.getCaching();
108   }
109 
110   public HRegionInfo getHRegionInfo() {
111     return currentScannerCallable.getHRegionInfo();
112   }
113 
114   @Override
115   public Result [] call(int timeout) throws IOException {
116     // If the active replica callable was closed somewhere, invoke the RPC to
117     // really close it. In the case of regular scanners, this applies. We make couple
118     // of RPCs to a RegionServer, and when that region is exhausted, we set
119     // the closed flag. Then an RPC is required to actually close the scanner.
120     if (currentScannerCallable != null && currentScannerCallable.closed) {
121       // For closing we target that exact scanner (and not do replica fallback like in
122       // the case of normal reads)
123       if (LOG.isTraceEnabled()) {
124         LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
125       }
126       Result[] r = currentScannerCallable.call(timeout);
127       currentScannerCallable = null;
128       return r;
129     }
130     // We need to do the following:
131     //1. When a scan goes out to a certain replica (default or not), we need to
132     //   continue to hit that until there is a failure. So store the last successfully invoked
133     //   replica
134     //2. We should close the "losing" scanners (scanners other than the ones we hear back
135     //   from first)
136     //
137     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
138         RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
139         currentScannerCallable.getRow());
140 
141     // allocate a boundedcompletion pool of some multiple of number of replicas.
142     // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
143     ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
144         new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
145             new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool,
146             rl.size() * 5);
147 
148     List<ExecutionException> exceptions = null;
149     int submitted = 0, completed = 0;
150     AtomicBoolean done = new AtomicBoolean(false);
151     replicaSwitched.set(false);
152     // submit call for the primary replica.
153     submitted += addCallsForCurrentReplica(cs, rl);
154     try {
155       // wait for the timeout to see whether the primary responds back
156       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
157           TimeUnit.MICROSECONDS); // Yes, microseconds
158       if (f != null) {
159         Pair<Result[], ScannerCallable> r = f.get();
160         if (r != null && r.getSecond() != null) {
161           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
162         }
163         return r == null ? null : r.getFirst(); //great we got a response
164       }
165     } catch (ExecutionException e) {
166       // the primary call failed with RetriesExhaustedException or DoNotRetryIOException
167       // but the secondaries might still succeed. Continue on the replica RPCs.
168       exceptions = new ArrayList<ExecutionException>(rl.size());
169       exceptions.add(e);
170       completed++;
171     } catch (CancellationException e) {
172       throw new InterruptedIOException(e.getMessage());
173     } catch (InterruptedException e) {
174       throw new InterruptedIOException(e.getMessage());
175     }
176     // submit call for the all of the secondaries at once
177     // TODO: this may be an overkill for large region replication
178     submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
179     try {
180       while (completed < submitted) {
181         try {
182           Future<Pair<Result[], ScannerCallable>> f = cs.take();
183           Pair<Result[], ScannerCallable> r = f.get();
184           if (r != null && r.getSecond() != null) {
185             updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
186           }
187           return r == null ? null : r.getFirst(); // great we got an answer
188         } catch (ExecutionException e) {
189           // if not cancel or interrupt, wait until all RPC's are done
190           // one of the tasks failed. Save the exception for later.
191           if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
192           exceptions.add(e);
193           completed++;
194         }
195       }
196     } catch (CancellationException e) {
197       throw new InterruptedIOException(e.getMessage());
198     } catch (InterruptedException e) {
199       throw new InterruptedIOException(e.getMessage());
200     } finally {
201       // We get there because we were interrupted or because one or more of the
202       // calls succeeded or failed. In all case, we stop all our tasks.
203       cs.cancelAll();
204     }
205 
206     if (exceptions != null && !exceptions.isEmpty()) {
207       RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0),
208           retries); // just rethrow the first exception for now.
209     }
210     return null; // unreachable
211   }
212 
213   private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
214       AtomicBoolean done, ExecutorService pool) {
215     if (done.compareAndSet(false, true)) {
216       if (currentScannerCallable != scanner) replicaSwitched.set(true);
217       currentScannerCallable = scanner;
218       // store where to start the replica scanner from if we need to.
219       if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
220       if (LOG.isTraceEnabled()) {
221         LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
222             " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
223       }
224       // close all outstanding replica scanners but the one we heard back from
225       outstandingCallables.remove(scanner);
226       for (ScannerCallable s : outstandingCallables) {
227         if (LOG.isTraceEnabled()) {
228           LOG.trace("Closing scanner id=" + s.scannerId +
229             ", replica=" + s.getHRegionInfo().getRegionId() +
230             " because slow and replica=" +
231             this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
232         }
233         // Submit the "close" to the pool since this might take time, and we don't
234         // want to wait for the "close" to happen yet. The "wait" will happen when
235         // the table is closed (when the awaitTermination of the underlying pool is called)
236         s.setClose();
237         final RetryingRPC r = new RetryingRPC(s);
238         pool.submit(new Callable<Void>(){
239           @Override
240           public Void call() throws Exception {
241             r.call(scannerTimeout);
242             return null;
243           }
244         });
245       }
246       // now clear outstandingCallables since we scheduled a close for all the contained scanners
247       outstandingCallables.clear();
248     }
249   }
250 
251   /**
252    * When a scanner switches in the middle of scanning (the 'next' call fails
253    * for example), the upper layer {@link ClientScanner} needs to know
254    * @return
255    */
256   public boolean switchedToADifferentReplica() {
257     return replicaSwitched.get();
258   }
259 
260   private int addCallsForCurrentReplica(
261       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
262     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
263     outstandingCallables.add(currentScannerCallable);
264     cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
265     return 1;
266   }
267 
268   private int addCallsForOtherReplicas(
269       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
270       int min, int max) {
271     if (scan.getConsistency() == Consistency.STRONG) {
272       return 0; // not scheduling on other replicas for strong consistency
273     }
274     for (int id = min; id <= max; id++) {
275       if (currentScannerCallable.id == id) {
276         continue; //this was already scheduled earlier
277       }
278       ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
279 
280       if (this.lastResult != null) {
281         if(s.getScan().isReversed()){
282           s.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
283         }else {
284           s.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
285         }
286       }
287       outstandingCallables.add(s);
288       RetryingRPC retryingOnReplica = new RetryingRPC(s);
289       cs.submit(retryingOnReplica, scannerTimeout, id);
290     }
291     return max - min + 1;
292   }
293 
294   @VisibleForTesting
295   boolean isAnyRPCcancelled() {
296     return someRPCcancelled;
297   }
298 
299   class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
300     final ScannerCallable callable;
301     RpcRetryingCaller<Result[]> caller;
302     private volatile boolean cancelled = false;
303 
304     RetryingRPC(ScannerCallable callable) {
305       this.callable = callable;
306       // For the Consistency.STRONG (default case), we reuse the caller
307       // to keep compatibility with what is done in the past
308       // For the Consistency.TIMELINE case, we can't reuse the caller
309       // since we could be making parallel RPCs (caller.callWithRetries is synchronized
310       // and we can't invoke it multiple times at the same time)
311       this.caller = ScannerCallableWithReplicas.this.caller;
312       if (scan.getConsistency() == Consistency.TIMELINE) {
313         this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
314             <Result[]>newCaller();
315       }
316     }
317 
318     @Override
319     public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
320       // since the retries is done within the ResultBoundedCompletionService,
321       // we don't invoke callWithRetries here
322       if (cancelled) {
323         return null;
324       }
325       Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
326       return new Pair<Result[], ScannerCallable>(res, this.callable);
327     }
328 
329     @Override
330     public void prepare(boolean reload) throws IOException {
331       if (cancelled) return;
332 
333       if (Thread.interrupted()) {
334         throw new InterruptedIOException();
335       }
336 
337       callable.prepare(reload);
338     }
339 
340     @Override
341     public void throwable(Throwable t, boolean retrying) {
342       callable.throwable(t, retrying);
343     }
344 
345     @Override
346     public String getExceptionMessageAdditionalDetail() {
347       return callable.getExceptionMessageAdditionalDetail();
348     }
349 
350     @Override
351     public long sleep(long pause, int tries) {
352       return callable.sleep(pause, tries);
353     }
354 
355     @Override
356     public void cancel() {
357       cancelled = true;
358       caller.cancel();
359       if (callable.getController() != null) {
360         callable.getController().startCancel();
361       }
362       someRPCcancelled = true;
363     }
364 
365     @Override
366     public boolean isCancelled() {
367       return cancelled;
368     }
369   }
370 
371   @Override
372   public void prepare(boolean reload) throws IOException {
373   }
374 
375   @Override
376   public void throwable(Throwable t, boolean retrying) {
377     currentScannerCallable.throwable(t, retrying);
378   }
379 
380   @Override
381   public String getExceptionMessageAdditionalDetail() {
382     return currentScannerCallable.getExceptionMessageAdditionalDetail();
383   }
384 
385   @Override
386   public long sleep(long pause, int tries) {
387     return currentScannerCallable.sleep(pause, tries);
388   }
389 }