View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.UnknownHostException;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellScanner;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.DoNotRetryIOException;
34  import org.apache.hadoop.hbase.HBaseIOException;
35  import org.apache.hadoop.hbase.HRegionInfo;
36  import org.apache.hadoop.hbase.HRegionLocation;
37  import org.apache.hadoop.hbase.NotServingRegionException;
38  import org.apache.hadoop.hbase.RegionLocations;
39  import org.apache.hadoop.hbase.ServerName;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.UnknownScannerException;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
44  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
45  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47  import org.apache.hadoop.hbase.protobuf.RequestConverter;
48  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
49  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
50  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
51  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
52  import org.apache.hadoop.ipc.RemoteException;
53  import org.apache.hadoop.net.DNS;
54  
55  import com.google.protobuf.ServiceException;
56  import com.google.protobuf.TextFormat;
57  
58  /**
59   * Scanner operations such as create, next, etc.
60   * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
61   * {@link RpcRetryingCaller} so fails are retried.
62   */
63  @InterfaceAudience.Private
64  public class ScannerCallable extends RegionServerCallable<Result[]> {
65    public static final String LOG_SCANNER_LATENCY_CUTOFF
66      = "hbase.client.log.scanner.latency.cutoff";
67    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
68  
69    // Keeping LOG public as it is being used in TestScannerHeartbeatMessages
70    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
71    protected long scannerId = -1L;
72    protected boolean instantiated = false;
73    protected boolean closed = false;
74    protected boolean renew = false;
75    private Scan scan;
76    private int caching = 1;
77    protected final ClusterConnection cConnection;
78    protected ScanMetrics scanMetrics;
79    private boolean logScannerActivity = false;
80    private int logCutOffLatency = 1000;
81    private static String myAddress;
82    protected final int id;
83    protected boolean serverHasMoreResultsContext;
84    protected boolean serverHasMoreResults;
85  
86    /**
87     * Saves whether or not the most recent response from the server was a heartbeat message.
88     * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
89     */
90    protected boolean heartbeatMessage = false;
91    static {
92      try {
93        myAddress = DNS.getDefaultHost("default", "default");
94      } catch (UnknownHostException uhe) {
95        LOG.error("cannot determine my address", uhe);
96      }
97    }
98  
99    // indicate if it is a remote server call
100   protected boolean isRegionServerRemote = true;
101   private long nextCallSeq = 0;
102   protected RpcControllerFactory controllerFactory;
103   protected PayloadCarryingRpcController controller;
104 
105   /**
106    * @param connection which connection
107    * @param tableName table callable is on
108    * @param scan the scan to execute
109    * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
110    *          metrics
111    * @param rpcControllerFactory factory to use when creating 
112    *        {@link com.google.protobuf.RpcController}
113    */
114   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
115       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
116     this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
117   }
118   /**
119    *
120    * @param connection
121    * @param tableName
122    * @param scan
123    * @param scanMetrics
124    * @param id the replicaId
125    */
126   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
127       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
128     super(connection, tableName, scan.getStartRow());
129     this.id = id;
130     this.cConnection = connection;
131     this.scan = scan;
132     this.scanMetrics = scanMetrics;
133     Configuration conf = connection.getConfiguration();
134     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
135     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
136     this.controllerFactory = rpcControllerFactory;
137   }
138 
139   PayloadCarryingRpcController getController() {
140     return controller;
141   }
142 
143   /**
144    * @param reload force reload of server location
145    * @throws IOException
146    */
147   @Override
148   public void prepare(boolean reload) throws IOException {
149     if (Thread.interrupted()) {
150       throw new InterruptedIOException();
151     }
152     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
153         id, getConnection(), getTableName(), getRow());
154     location = id < rl.size() ? rl.getRegionLocation(id) : null;
155     if (location == null || location.getServerName() == null) {
156       // With this exception, there will be a retry. The location can be null for a replica
157       //  when the table is created or after a split.
158       throw new HBaseIOException("There is no location for replica id #" + id);
159     }
160     ServerName dest = location.getServerName();
161     setStub(super.getConnection().getClient(dest));
162     if (!instantiated || reload) {
163       checkIfRegionServerIsRemote();
164       instantiated = true;
165     }
166 
167     // check how often we retry.
168     if (reload && this.scanMetrics != null) {
169       this.scanMetrics.countOfRPCRetries.incrementAndGet();
170       if (isRegionServerRemote) {
171         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
172       }
173     }
174   }
175 
176   /**
177    * compare the local machine hostname with region server's hostname
178    * to decide if hbase client connects to a remote region server
179    */
180   protected void checkIfRegionServerIsRemote() {
181     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
182       isRegionServerRemote = false;
183     } else {
184       isRegionServerRemote = true;
185     }
186   }
187 
188 
189   @Override
190   public Result [] call(int callTimeout) throws IOException {
191     if (Thread.interrupted()) {
192       throw new InterruptedIOException();
193     }
194 
195     if (controller == null) {
196       controller = controllerFactory.newController();
197       controller.setPriority(getTableName());
198       controller.setCallTimeout(callTimeout);
199     }
200 
201     if (closed) {
202       if (scannerId != -1) {
203         close();
204       }
205     } else {
206       if (scannerId == -1L) {
207         this.scannerId = openScanner();
208       } else {
209         Result [] rrs = null;
210         ScanRequest request = null;
211         // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
212         setHeartbeatMessage(false);
213         try {
214           incRPCcallsMetrics();
215           request =
216               RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
217                 this.scanMetrics != null, renew);
218           ScanResponse response = null;
219           try {
220             response = getStub().scan(controller, request);
221             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
222             // from client to server will increment this number in both sides. Client passes this
223             // number along with the request and at RS side both the incoming nextCallSeq and its
224             // nextCallSeq will be matched. In case of a timeout this increment at the client side
225             // should not happen. If at the server side fetching of next batch of data was over,
226             // there will be mismatch in the nextCallSeq number. Server will throw
227             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
228             // as the last successfully retrieved row.
229             // See HBASE-5974
230             nextCallSeq++;
231             long timestamp = System.currentTimeMillis();
232             setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
233             // Results are returned via controller
234             CellScanner cellScanner = controller.cellScanner();
235             rrs = ResponseConverter.getResults(cellScanner, response);
236             if (logScannerActivity) {
237               long now = System.currentTimeMillis();
238               if (now - timestamp > logCutOffLatency) {
239                 int rows = rrs == null ? 0 : rrs.length;
240                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
241                   + rows + " rows from scanner=" + scannerId);
242               }
243             }
244             updateServerSideMetrics(response);
245             // moreResults is only used for the case where a filter exhausts all elements
246             if (response.hasMoreResults() && !response.getMoreResults()) {
247               scannerId = -1L;
248               closed = true;
249               // Implied that no results were returned back, either.
250               return null;
251             }
252             // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
253             // to size or quantity of results in the response.
254             if (response.hasMoreResultsInRegion()) {
255               // Set what the RS said
256               setHasMoreResultsContext(true);
257               setServerHasMoreResults(response.getMoreResultsInRegion());
258             } else {
259               // Server didn't respond whether it has more results or not.
260               setHasMoreResultsContext(false);
261             }
262           } catch (ServiceException se) {
263             throw ProtobufUtil.getRemoteException(se);
264           }
265           updateResultsMetrics(rrs);
266         } catch (IOException e) {
267           if (logScannerActivity) {
268             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
269               + " to " + getLocation(), e);
270           }
271           IOException ioe = e;
272           if (e instanceof RemoteException) {
273             ioe = ((RemoteException) e).unwrapRemoteException();
274           }
275           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
276             try {
277               HRegionLocation location =
278                 getConnection().relocateRegion(getTableName(), scan.getStartRow());
279               LOG.info("Scanner=" + scannerId
280                 + " expired, current region location is " + location.toString());
281             } catch (Throwable t) {
282               LOG.info("Failed to relocate region", t);
283             }
284           }
285           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
286           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
287           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
288           // a scan when doing a next in particular, we want to break out and get the scanner to
289           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
290           // yeah and hard to follow and in need of a refactor).
291           if (ioe instanceof NotServingRegionException) {
292             // Throw a DNRE so that we break out of cycle of calling NSRE
293             // when what we need is to open scanner against new location.
294             // Attach NSRE to signal client that it needs to re-setup scanner.
295             if (this.scanMetrics != null) {
296               this.scanMetrics.countOfNSRE.incrementAndGet();
297             }
298             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
299           } else if (ioe instanceof RegionServerStoppedException) {
300             // Throw a DNRE so that we break out of cycle of the retries and instead go and
301             // open scanner against new location.
302             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
303           } else {
304             // The outer layers will retry
305             throw ioe;
306           }
307         }
308         return rrs;
309       }
310     }
311     return null;
312   }
313 
314   /**
315    * @return true when the most recent RPC response indicated that the response was a heartbeat
316    *         message. Heartbeat messages are sent back from the server when the processing of the
317    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
318    *         timeouts during long running scan operations.
319    */
320   protected boolean isHeartbeatMessage() {
321     return heartbeatMessage;
322   }
323 
324   protected void setHeartbeatMessage(boolean heartbeatMessage) {
325     this.heartbeatMessage = heartbeatMessage;
326   }
327 
328   private void incRPCcallsMetrics() {
329     if (this.scanMetrics == null) {
330       return;
331     }
332     this.scanMetrics.countOfRPCcalls.incrementAndGet();
333     if (isRegionServerRemote) {
334       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
335     }
336   }
337 
338   protected void updateResultsMetrics(Result[] rrs) {
339     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
340       return;
341     }
342     long resultSize = 0;
343     for (Result rr : rrs) {
344       for (Cell cell : rr.rawCells()) {
345         resultSize += CellUtil.estimatedSerializedSizeOf(cell);
346       }
347     }
348     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
349     if (isRegionServerRemote) {
350       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
351     }
352   }
353 
354   /**
355    * Use the scan metrics returned by the server to add to the identically named counters in the
356    * client side metrics. If a counter does not exist with the same name as the server side metric,
357    * the attempt to increase the counter will fail.
358    * @param response
359    */
360   private void updateServerSideMetrics(ScanResponse response) {
361     if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
362 
363     Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
364     for (Entry<String, Long> entry : serverMetrics.entrySet()) {
365       this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
366     }
367   }
368 
369   private void close() {
370     if (this.scannerId == -1L) {
371       return;
372     }
373     try {
374       incRPCcallsMetrics();
375       ScanRequest request =
376           RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
377       try {
378         getStub().scan(controller, request);
379       } catch (ServiceException se) {
380         throw ProtobufUtil.getRemoteException(se);
381       }
382     } catch (IOException e) {
383       LOG.warn("Ignore, probably already closed", e);
384     }
385     this.scannerId = -1L;
386   }
387 
388   protected long openScanner() throws IOException {
389     incRPCcallsMetrics();
390     ScanRequest request =
391       RequestConverter.buildScanRequest(
392         getLocation().getRegionInfo().getRegionName(),
393         this.scan, 0, false);
394     try {
395       ScanResponse response = getStub().scan(controller, request);
396       long id = response.getScannerId();
397       if (logScannerActivity) {
398         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
399           + " on region " + getLocation().toString());
400       }
401       return id;
402     } catch (ServiceException se) {
403       throw ProtobufUtil.getRemoteException(se);
404     }
405   }
406 
407   protected Scan getScan() {
408     return scan;
409   }
410 
411   /**
412    * Call this when the next invocation of call should close the scanner
413    */
414   public void setClose() {
415     this.closed = true;
416   }
417 
418   /**
419    * Indicate whether we make a call only to renew the lease, but without affected the scanner in
420    * any other way.
421    * @param val true if only the lease should be renewed
422    */
423   public void setRenew(boolean val) {
424     this.renew = val;
425   }
426 
427   /**
428    * @return the HRegionInfo for the current region
429    */
430   @Override
431   public HRegionInfo getHRegionInfo() {
432     if (!instantiated) {
433       return null;
434     }
435     return getLocation().getRegionInfo();
436   }
437 
438   /**
439    * Get the number of rows that will be fetched on next
440    * @return the number of rows for caching
441    */
442   public int getCaching() {
443     return caching;
444   }
445 
446   @Override
447   public ClusterConnection getConnection() {
448     return cConnection;
449   }
450 
451   /**
452    * Set the number of rows that will be fetched on next
453    * @param caching the number of rows for caching
454    */
455   public void setCaching(int caching) {
456     this.caching = caching;
457   }
458 
459   public ScannerCallable getScannerCallableForReplica(int id) {
460     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
461         this.getScan(), this.scanMetrics, controllerFactory, id);
462     s.setCaching(this.caching);
463     return s;
464   }
465 
466   /**
467    * Should the client attempt to fetch more results from this region
468    * @return True if the client should attempt to fetch more results, false otherwise.
469    */
470   protected boolean getServerHasMoreResults() {
471     assert serverHasMoreResultsContext;
472     return this.serverHasMoreResults;
473   }
474 
475   protected void setServerHasMoreResults(boolean serverHasMoreResults) {
476     this.serverHasMoreResults = serverHasMoreResults;
477   }
478 
479   /**
480    * Did the server respond with information about whether more results might exist.
481    * Not guaranteed to respond with older server versions
482    * @return True if the server responded with information about more results.
483    */
484   protected boolean hasMoreResultsContext() {
485     return serverHasMoreResultsContext;
486   }
487 
488   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
489     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
490   }
491 }