View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.UnknownHostException;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellScanner;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.DoNotRetryIOException;
35  import org.apache.hadoop.hbase.HBaseIOException;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.HRegionLocation;
38  import org.apache.hadoop.hbase.NotServingRegionException;
39  import org.apache.hadoop.hbase.RegionLocations;
40  import org.apache.hadoop.hbase.RemoteExceptionHandler;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.UnknownScannerException;
44  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
45  import org.apache.hadoop.hbase.exceptions.ScannerResetException;
46  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
47  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
48  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
49  import org.apache.hadoop.hbase.protobuf.RequestConverter;
50  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
51  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
52  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
53  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
54  import org.apache.hadoop.ipc.RemoteException;
55  import org.apache.hadoop.net.DNS;
56  
57  import com.google.protobuf.ServiceException;
58  import com.google.protobuf.TextFormat;
59  
60  /**
61   * Scanner operations such as create, next, etc.
62   * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
63   * {@link RpcRetryingCaller} so fails are retried.
64   */
65  @InterfaceAudience.Private
66  public class ScannerCallable extends RegionServerCallable<Result[]> {
67    public static final String LOG_SCANNER_LATENCY_CUTOFF
68      = "hbase.client.log.scanner.latency.cutoff";
69    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
70  
71    // Keeping LOG public as it is being used in TestScannerHeartbeatMessages
72    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
73    protected long scannerId = -1L;
74    protected boolean instantiated = false;
75    protected boolean closed = false;
76    protected boolean renew = false;
77    private Scan scan;
78    private int caching = 1;
79    protected final ClusterConnection cConnection;
80    protected ScanMetrics scanMetrics;
81    private boolean logScannerActivity = false;
82    private int logCutOffLatency = 1000;
83    private static String myAddress;
84    protected final int id;
85    protected boolean serverHasMoreResultsContext;
86    protected boolean serverHasMoreResults;
87  
88    /**
89     * Saves whether or not the most recent response from the server was a heartbeat message.
90     * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
91     */
92    protected boolean heartbeatMessage = false;
93    static {
94      try {
95        myAddress = DNS.getDefaultHost("default", "default");
96      } catch (UnknownHostException uhe) {
97        LOG.error("cannot determine my address", uhe);
98      }
99    }
100 
101   // indicate if it is a remote server call
102   protected boolean isRegionServerRemote = true;
103   private long nextCallSeq = 0;
104   protected RpcControllerFactory controllerFactory;
105   protected PayloadCarryingRpcController controller;
106 
107   /**
108    * @param connection which connection
109    * @param tableName table callable is on
110    * @param scan the scan to execute
111    * @param scanMetrics the ScanMetrics to used, if it is null,
112    *        ScannerCallable won't collect metrics
113    * @param rpcControllerFactory factory to use when creating
114    *        {@link com.google.protobuf.RpcController}
115    */
116   public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
117       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
118     this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
119   }
120   /**
121    *
122    * @param connection
123    * @param tableName
124    * @param scan
125    * @param scanMetrics
126    * @param id the replicaId
127    */
128   public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
129       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
130     super(connection, tableName, scan.getStartRow());
131     this.id = id;
132     this.cConnection = connection;
133     this.scan = scan;
134     this.scanMetrics = scanMetrics;
135     Configuration conf = connection.getConfiguration();
136     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
137     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
138     this.controllerFactory = rpcControllerFactory;
139   }
140 
141   PayloadCarryingRpcController getController() {
142     return controller;
143   }
144 
145   /**
146    * @param reload force reload of server location
147    * @throws IOException
148    */
149   @Override
150   public void prepare(boolean reload) throws IOException {
151     if (Thread.interrupted()) {
152       throw new InterruptedIOException();
153     }
154     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
155         id, getConnection(), getTableName(), getRow());
156     location = id < rl.size() ? rl.getRegionLocation(id) : null;
157     if (location == null || location.getServerName() == null) {
158       // With this exception, there will be a retry. The location can be null for a replica
159       //  when the table is created or after a split.
160       throw new HBaseIOException("There is no location for replica id #" + id);
161     }
162     ServerName dest = location.getServerName();
163     setStub(super.getConnection().getClient(dest));
164     if (!instantiated || reload) {
165       checkIfRegionServerIsRemote();
166       instantiated = true;
167     }
168 
169     // check how often we retry.
170     // HConnectionManager will call instantiateServer with reload==true
171     // if and only if for retries.
172     if (reload && this.scanMetrics != null) {
173       this.scanMetrics.countOfRPCRetries.incrementAndGet();
174       if (isRegionServerRemote) {
175         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
176       }
177     }
178   }
179 
180   /**
181    * compare the local machine hostname with region server's hostname
182    * to decide if hbase client connects to a remote region server
183    */
184   protected void checkIfRegionServerIsRemote() {
185     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
186       isRegionServerRemote = false;
187     } else {
188       isRegionServerRemote = true;
189     }
190   }
191 
192 
193   @Override
194   public Result [] call(int callTimeout) throws IOException {
195     if (Thread.interrupted()) {
196       throw new InterruptedIOException();
197     }
198     if (closed) {
199       if (scannerId != -1) {
200         close();
201       }
202     } else {
203       if (scannerId == -1L) {
204         this.scannerId = openScanner();
205       } else {
206         Result [] rrs = null;
207         ScanRequest request = null;
208         // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
209         setHeartbeatMessage(false);
210         try {
211           incRPCcallsMetrics();
212           request =
213               RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
214                 this.scanMetrics != null, renew);
215           ScanResponse response = null;
216           controller = controllerFactory.newController();
217           controller.setPriority(getTableName());
218           controller.setCallTimeout(callTimeout);
219           try {
220             response = getStub().scan(controller, request);
221             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
222             // from client to server will increment this number in both sides. Client passes this
223             // number along with the request and at RS side both the incoming nextCallSeq and its
224             // nextCallSeq will be matched. In case of a timeout this increment at the client side
225             // should not happen. If at the server side fetching of next batch of data was over,
226             // there will be mismatch in the nextCallSeq number. Server will throw
227             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
228             // as the last successfully retrieved row.
229             // See HBASE-5974
230             nextCallSeq++;
231             long timestamp = System.currentTimeMillis();
232             setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
233             // Results are returned via controller
234             CellScanner cellScanner = controller.cellScanner();
235             rrs = ResponseConverter.getResults(cellScanner, response);
236             if (logScannerActivity) {
237               long now = System.currentTimeMillis();
238               if (now - timestamp > logCutOffLatency) {
239                 int rows = rrs == null ? 0 : rrs.length;
240                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
241                   + rows + " rows from scanner=" + scannerId);
242               }
243             }
244             updateServerSideMetrics(response);
245             // moreResults is only used for the case where a filter exhausts all elements
246             if (response.hasMoreResults() && !response.getMoreResults()) {
247               scannerId = -1L;
248               closed = true;
249               // Implied that no results were returned back, either.
250               return null;
251             }
252             // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
253             // to size or quantity of results in the response.
254             if (response.hasMoreResultsInRegion()) {
255               // Set what the RS said
256               setHasMoreResultsContext(true);
257               setServerHasMoreResults(response.getMoreResultsInRegion());
258             } else {
259               // Server didn't respond whether it has more results or not.
260               setHasMoreResultsContext(false);
261             }
262           } catch (ServiceException se) {
263             throw ProtobufUtil.getRemoteException(se);
264           }
265           updateResultsMetrics(rrs);
266         } catch (IOException e) {
267           if (logScannerActivity) {
268             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
269               + " to " + getLocation(), e);
270           }
271           IOException ioe = e;
272           if (e instanceof RemoteException) {
273             ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
274           }
275           if (logScannerActivity) {
276             if (ioe instanceof UnknownScannerException) {
277               try {
278                 HRegionLocation location =
279                     getConnection().relocateRegion(getTableName(), scan.getStartRow());
280                 LOG.info("Scanner=" + scannerId
281                   + " expired, current region location is " + location.toString());
282               } catch (Throwable t) {
283                 LOG.info("Failed to relocate region", t);
284               }
285             } else if (ioe instanceof ScannerResetException) {
286               LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
287                   + "asked us to reset the scanner state.", ioe);
288             }
289           }
290           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
291           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
292           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
293           // a scan when doing a next in particular, we want to break out and get the scanner to
294           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
295           // yeah and hard to follow and in need of a refactor).
296           if (ioe instanceof NotServingRegionException) {
297             // Throw a DNRE so that we break out of cycle of calling NSRE
298             // when what we need is to open scanner against new location.
299             // Attach NSRE to signal client that it needs to re-setup scanner.
300             if (this.scanMetrics != null) {
301               this.scanMetrics.countOfNSRE.incrementAndGet();
302             }
303             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
304           } else if (ioe instanceof RegionServerStoppedException) {
305             // Throw a DNRE so that we break out of cycle of the retries and instead go and
306             // open scanner against new location.
307             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
308           } else {
309             // The outer layers will retry
310             throw ioe;
311           }
312         }
313         return rrs;
314       }
315     }
316     return null;
317   }
318 
319   /**
320    * @return true when the most recent RPC response indicated that the response was a heartbeat
321    *         message. Heartbeat messages are sent back from the server when the processing of the
322    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
323    *         timeouts during long running scan operations.
324    */
325   protected boolean isHeartbeatMessage() {
326     return heartbeatMessage;
327   }
328 
329   protected void setHeartbeatMessage(boolean heartbeatMessage) {
330     this.heartbeatMessage = heartbeatMessage;
331   }
332 
333   private void incRPCcallsMetrics() {
334     if (this.scanMetrics == null) {
335       return;
336     }
337     this.scanMetrics.countOfRPCcalls.incrementAndGet();
338     if (isRegionServerRemote) {
339       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
340     }
341   }
342 
343   protected void updateResultsMetrics(Result[] rrs) {
344     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
345       return;
346     }
347     long resultSize = 0;
348     for (Result rr : rrs) {
349       for (Cell cell : rr.rawCells()) {
350         resultSize += CellUtil.estimatedSerializedSizeOf(cell);
351       }
352     }
353     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
354     if (isRegionServerRemote) {
355       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
356     }
357   }
358 
359   /**
360    * Use the scan metrics returned by the server to add to the identically named counters in the
361    * client side metrics. If a counter does not exist with the same name as the server side metric,
362    * the attempt to increase the counter will fail.
363    * @param response
364    */
365   private void updateServerSideMetrics(ScanResponse response) {
366     if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
367 
368     Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
369     for (Entry<String, Long> entry : serverMetrics.entrySet()) {
370       this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
371     }
372   }
373 
374   private void close() {
375     if (this.scannerId == -1L) {
376       return;
377     }
378     try {
379       incRPCcallsMetrics();
380       ScanRequest request =
381           RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
382       try {
383         getStub().scan(null, request);
384       } catch (ServiceException se) {
385         throw ProtobufUtil.getRemoteException(se);
386       }
387     } catch (IOException e) {
388       LOG.warn("Ignore, probably already closed", e);
389     }
390     this.scannerId = -1L;
391   }
392 
393   protected long openScanner() throws IOException {
394     incRPCcallsMetrics();
395     ScanRequest request =
396       RequestConverter.buildScanRequest(
397         getLocation().getRegionInfo().getRegionName(),
398         this.scan, 0, false);
399     try {
400       ScanResponse response = getStub().scan(null, request);
401       long id = response.getScannerId();
402       if (logScannerActivity) {
403         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
404           + " on region " + getLocation().toString());
405       }
406       return id;
407     } catch (ServiceException se) {
408       throw ProtobufUtil.getRemoteException(se);
409     }
410   }
411 
412   protected Scan getScan() {
413     return scan;
414   }
415 
416   /**
417    * Call this when the next invocation of call should close the scanner
418    */
419   public void setClose() {
420     this.closed = true;
421   }
422 
423   /**
424    * Indicate whether we make a call only to renew the lease, but without affected the scanner in
425    * any other way.
426    * @param val true if only the lease should be renewed
427    */
428   public void setRenew(boolean val) {
429     this.renew = val;
430   }
431 
432   /**
433    * @return the HRegionInfo for the current region
434    */
435   @Override
436   public HRegionInfo getHRegionInfo() {
437     if (!instantiated) {
438       return null;
439     }
440     return getLocation().getRegionInfo();
441   }
442 
443   /**
444    * Get the number of rows that will be fetched on next
445    * @return the number of rows for caching
446    */
447   public int getCaching() {
448     return caching;
449   }
450 
451   @Override
452   public ClusterConnection getConnection() {
453     return cConnection;
454   }
455 
456   /**
457    * Set the number of rows that will be fetched on next
458    * @param caching the number of rows for caching
459    */
460   public void setCaching(int caching) {
461     this.caching = caching;
462   }
463 
464   public ScannerCallable getScannerCallableForReplica(int id) {
465     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
466         this.getScan(), this.scanMetrics, controllerFactory, id);
467     s.setCaching(this.caching);
468     return s;
469   }
470 
471   /**
472    * Should the client attempt to fetch more results from this region
473    * @return True if the client should attempt to fetch more results, false otherwise.
474    */
475   protected boolean getServerHasMoreResults() {
476     assert serverHasMoreResultsContext;
477     return this.serverHasMoreResults;
478   }
479 
480   protected void setServerHasMoreResults(boolean serverHasMoreResults) {
481     this.serverHasMoreResults = serverHasMoreResults;
482   }
483 
484   /**
485    * Did the server respond with information about whether more results might exist.
486    * Not guaranteed to respond with older server versions
487    * @return True if the server responded with information about more results.
488    */
489   protected boolean hasMoreResultsContext() {
490     return serverHasMoreResultsContext;
491   }
492 
493   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
494     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
495   }
496 }