View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.UnknownHostException;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.DoNotRetryIOException;
32  import org.apache.hadoop.hbase.HBaseIOException;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.HRegionLocation;
35  import org.apache.hadoop.hbase.NotServingRegionException;
36  import org.apache.hadoop.hbase.RegionLocations;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.UnknownScannerException;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
42  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
43  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
44  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
45  import org.apache.hadoop.hbase.protobuf.RequestConverter;
46  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
47  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
48  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
49  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
50  import org.apache.hadoop.ipc.RemoteException;
51  import org.apache.hadoop.net.DNS;
52  
53  import com.google.protobuf.ServiceException;
54  import com.google.protobuf.TextFormat;
55  
56  /**
57   * Scanner operations such as create, next, etc.
58   * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
59   * {@link RpcRetryingCaller} so fails are retried.
60   */
61  @InterfaceAudience.Private
62  public class ScannerCallable extends RegionServerCallable<Result[]> {
63    public static final String LOG_SCANNER_LATENCY_CUTOFF
64      = "hbase.client.log.scanner.latency.cutoff";
65    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
66  
67    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
68    protected long scannerId = -1L;
69    protected boolean instantiated = false;
70    protected boolean closed = false;
71    private Scan scan;
72    private int caching = 1;
73    protected final ClusterConnection cConnection;
74    protected ScanMetrics scanMetrics;
75    private boolean logScannerActivity = false;
76    private int logCutOffLatency = 1000;
77    private static String myAddress;
78    protected final int id;
79    protected boolean serverHasMoreResultsContext;
80    protected boolean serverHasMoreResults;
81  
82    /**
83     * Saves whether or not the most recent response from the server was a heartbeat message.
84     * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
85     */
86    protected boolean heartbeatMessage = false;
87    static {
88      try {
89        myAddress = DNS.getDefaultHost("default", "default");
90      } catch (UnknownHostException uhe) {
91        LOG.error("cannot determine my address", uhe);
92      }
93    }
94  
95    // indicate if it is a remote server call
96    protected boolean isRegionServerRemote = true;
97    private long nextCallSeq = 0;
98    protected RpcControllerFactory controllerFactory;
99    protected PayloadCarryingRpcController controller;
100 
101   /**
102    * @param connection which connection
103    * @param tableName table callable is on
104    * @param scan the scan to execute
105    * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
106    *          metrics
107    * @param rpcControllerFactory factory to use when creating 
108    *        {@link com.google.protobuf.RpcController}
109    */
110   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
111       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
112     this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
113   }
114   /**
115    *
116    * @param connection
117    * @param tableName
118    * @param scan
119    * @param scanMetrics
120    * @param id the replicaId
121    */
122   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
123       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
124     super(connection, tableName, scan.getStartRow());
125     this.id = id;
126     this.cConnection = connection;
127     this.scan = scan;
128     this.scanMetrics = scanMetrics;
129     Configuration conf = connection.getConfiguration();
130     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
131     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
132     this.controllerFactory = rpcControllerFactory;
133   }
134 
135   PayloadCarryingRpcController getController() {
136     return controller;
137   }
138 
139   /**
140    * @param reload force reload of server location
141    * @throws IOException
142    */
143   @Override
144   public void prepare(boolean reload) throws IOException {
145     if (Thread.interrupted()) {
146       throw new InterruptedIOException();
147     }
148     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
149         id, getConnection(), getTableName(), getRow());
150     location = id < rl.size() ? rl.getRegionLocation(id) : null;
151     if (location == null || location.getServerName() == null) {
152       // With this exception, there will be a retry. The location can be null for a replica
153       //  when the table is created or after a split.
154       throw new HBaseIOException("There is no location for replica id #" + id);
155     }
156     ServerName dest = location.getServerName();
157     setStub(super.getConnection().getClient(dest));
158     if (!instantiated || reload) {
159       checkIfRegionServerIsRemote();
160       instantiated = true;
161     }
162 
163     // check how often we retry.
164     if (reload && this.scanMetrics != null) {
165       this.scanMetrics.countOfRPCRetries.incrementAndGet();
166       if (isRegionServerRemote) {
167         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
168       }
169     }
170   }
171 
172   /**
173    * compare the local machine hostname with region server's hostname
174    * to decide if hbase client connects to a remote region server
175    */
176   protected void checkIfRegionServerIsRemote() {
177     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
178       isRegionServerRemote = false;
179     } else {
180       isRegionServerRemote = true;
181     }
182   }
183 
184 
185   @Override
186   public Result [] call(int callTimeout) throws IOException {
187     if (Thread.interrupted()) {
188       throw new InterruptedIOException();
189     }
190     if (closed) {
191       if (scannerId != -1) {
192         close();
193       }
194     } else {
195       if (scannerId == -1L) {
196         this.scannerId = openScanner();
197       } else {
198         Result [] rrs = null;
199         ScanRequest request = null;
200         // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
201         setHeartbeatMessage(false);
202         try {
203           incRPCcallsMetrics();
204           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
205           ScanResponse response = null;
206           controller = controllerFactory.newController();
207           controller.setPriority(getTableName());
208           controller.setCallTimeout(callTimeout);
209           try {
210             response = getStub().scan(controller, request);
211             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
212             // from client to server will increment this number in both sides. Client passes this
213             // number along with the request and at RS side both the incoming nextCallSeq and its
214             // nextCallSeq will be matched. In case of a timeout this increment at the client side
215             // should not happen. If at the server side fetching of next batch of data was over,
216             // there will be mismatch in the nextCallSeq number. Server will throw
217             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
218             // as the last successfully retrieved row.
219             // See HBASE-5974
220             nextCallSeq++;
221             long timestamp = System.currentTimeMillis();
222             setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
223             // Results are returned via controller
224             CellScanner cellScanner = controller.cellScanner();
225             rrs = ResponseConverter.getResults(cellScanner, response);
226             if (logScannerActivity) {
227               long now = System.currentTimeMillis();
228               if (now - timestamp > logCutOffLatency) {
229                 int rows = rrs == null ? 0 : rrs.length;
230                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
231                   + rows + " rows from scanner=" + scannerId);
232               }
233             }
234             // moreResults is only used for the case where a filter exhausts all elements
235             if (response.hasMoreResults() && !response.getMoreResults()) {
236               scannerId = -1L;
237               closed = true;
238               // Implied that no results were returned back, either.
239               return null;
240             }
241             // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
242             // to size or quantity of results in the response.
243             if (response.hasMoreResultsInRegion()) {
244               // Set what the RS said
245               setHasMoreResultsContext(true);
246               setServerHasMoreResults(response.getMoreResultsInRegion());
247             } else {
248               // Server didn't respond whether it has more results or not.
249               setHasMoreResultsContext(false);
250             }
251           } catch (ServiceException se) {
252             throw ProtobufUtil.getRemoteException(se);
253           }
254           updateResultsMetrics(rrs);
255         } catch (IOException e) {
256           if (logScannerActivity) {
257             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
258               + " to " + getLocation(), e);
259           }
260           IOException ioe = e;
261           if (e instanceof RemoteException) {
262             ioe = ((RemoteException) e).unwrapRemoteException();
263           }
264           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
265             try {
266               HRegionLocation location =
267                 getConnection().relocateRegion(getTableName(), scan.getStartRow());
268               LOG.info("Scanner=" + scannerId
269                 + " expired, current region location is " + location.toString());
270             } catch (Throwable t) {
271               LOG.info("Failed to relocate region", t);
272             }
273           }
274           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
275           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
276           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
277           // a scan when doing a next in particular, we want to break out and get the scanner to
278           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
279           // yeah and hard to follow and in need of a refactor).
280           if (ioe instanceof NotServingRegionException) {
281             // Throw a DNRE so that we break out of cycle of calling NSRE
282             // when what we need is to open scanner against new location.
283             // Attach NSRE to signal client that it needs to re-setup scanner.
284             if (this.scanMetrics != null) {
285               this.scanMetrics.countOfNSRE.incrementAndGet();
286             }
287             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
288           } else if (ioe instanceof RegionServerStoppedException) {
289             // Throw a DNRE so that we break out of cycle of the retries and instead go and
290             // open scanner against new location.
291             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
292           } else {
293             // The outer layers will retry
294             throw ioe;
295           }
296         }
297         return rrs;
298       }
299     }
300     return null;
301   }
302 
303   /**
304    * @return true when the most recent RPC response indicated that the response was a heartbeat
305    *         message. Heartbeat messages are sent back from the server when the processing of the
306    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
307    *         timeouts during long running scan operations.
308    */
309   protected boolean isHeartbeatMessage() {
310     return heartbeatMessage;
311   }
312 
313   protected void setHeartbeatMessage(boolean heartbeatMessage) {
314     this.heartbeatMessage = heartbeatMessage;
315   }
316 
317   private void incRPCcallsMetrics() {
318     if (this.scanMetrics == null) {
319       return;
320     }
321     this.scanMetrics.countOfRPCcalls.incrementAndGet();
322     if (isRegionServerRemote) {
323       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
324     }
325   }
326 
327   private void updateResultsMetrics(Result[] rrs) {
328     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
329       return;
330     }
331     long resultSize = 0;
332     for (Result rr : rrs) {
333       for (Cell cell : rr.rawCells()) {
334         resultSize += CellUtil.estimatedSerializedSizeOf(cell);
335       }
336     }
337     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
338     if (isRegionServerRemote) {
339       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
340     }
341   }
342 
343   private void close() {
344     if (this.scannerId == -1L) {
345       return;
346     }
347     try {
348       incRPCcallsMetrics();
349       ScanRequest request =
350         RequestConverter.buildScanRequest(this.scannerId, 0, true);
351       try {
352         getStub().scan(null, request);
353       } catch (ServiceException se) {
354         throw ProtobufUtil.getRemoteException(se);
355       }
356     } catch (IOException e) {
357       LOG.warn("Ignore, probably already closed", e);
358     }
359     this.scannerId = -1L;
360   }
361 
362   protected long openScanner() throws IOException {
363     incRPCcallsMetrics();
364     ScanRequest request =
365       RequestConverter.buildScanRequest(
366         getLocation().getRegionInfo().getRegionName(),
367         this.scan, 0, false);
368     try {
369       ScanResponse response = getStub().scan(null, request);
370       long id = response.getScannerId();
371       if (logScannerActivity) {
372         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
373           + " on region " + getLocation().toString());
374       }
375       return id;
376     } catch (ServiceException se) {
377       throw ProtobufUtil.getRemoteException(se);
378     }
379   }
380 
381   protected Scan getScan() {
382     return scan;
383   }
384 
385   /**
386    * Call this when the next invocation of call should close the scanner
387    */
388   public void setClose() {
389     this.closed = true;
390   }
391 
392   /**
393    * @return the HRegionInfo for the current region
394    */
395   @Override
396   public HRegionInfo getHRegionInfo() {
397     if (!instantiated) {
398       return null;
399     }
400     return getLocation().getRegionInfo();
401   }
402 
403   /**
404    * Get the number of rows that will be fetched on next
405    * @return the number of rows for caching
406    */
407   public int getCaching() {
408     return caching;
409   }
410 
411   @Override
412   public ClusterConnection getConnection() {
413     return cConnection;
414   }
415 
416   /**
417    * Set the number of rows that will be fetched on next
418    * @param caching the number of rows for caching
419    */
420   public void setCaching(int caching) {
421     this.caching = caching;
422   }
423 
424   public ScannerCallable getScannerCallableForReplica(int id) {
425     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
426         this.getScan(), this.scanMetrics, controllerFactory, id);
427     s.setCaching(this.caching);
428     return s;
429   }
430 
431   /**
432    * Should the client attempt to fetch more results from this region
433    * @return True if the client should attempt to fetch more results, false otherwise.
434    */
435   protected boolean getServerHasMoreResults() {
436     assert serverHasMoreResultsContext;
437     return this.serverHasMoreResults;
438   }
439 
440   protected void setServerHasMoreResults(boolean serverHasMoreResults) {
441     this.serverHasMoreResults = serverHasMoreResults;
442   }
443 
444   /**
445    * Did the server respond with information about whether more results might exist.
446    * Not guaranteed to respond with older server versions
447    * @return True if the server responded with information about more results.
448    */
449   protected boolean hasMoreResultsContext() {
450     return serverHasMoreResultsContext;
451   }
452 
453   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
454     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
455   }
456 }