View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.UnknownHostException;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellScanner;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.DoNotRetryIOException;
34  import org.apache.hadoop.hbase.HBaseIOException;
35  import org.apache.hadoop.hbase.HRegionInfo;
36  import org.apache.hadoop.hbase.HRegionLocation;
37  import org.apache.hadoop.hbase.NotServingRegionException;
38  import org.apache.hadoop.hbase.RegionLocations;
39  import org.apache.hadoop.hbase.ServerName;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.UnknownScannerException;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
44  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
45  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47  import org.apache.hadoop.hbase.protobuf.RequestConverter;
48  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
49  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
50  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
51  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
52  import org.apache.hadoop.ipc.RemoteException;
53  import org.apache.hadoop.net.DNS;
54  
55  import com.google.protobuf.ServiceException;
56  import com.google.protobuf.TextFormat;
57  
58  /**
59   * Scanner operations such as create, next, etc.
60   * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
61   * {@link RpcRetryingCaller} so fails are retried.
62   */
63  @InterfaceAudience.Private
64  public class ScannerCallable extends RegionServerCallable<Result[]> {
65    public static final String LOG_SCANNER_LATENCY_CUTOFF
66      = "hbase.client.log.scanner.latency.cutoff";
67    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
68  
69    // Keeping LOG public as it is being used in TestScannerHeartbeatMessages
70    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
71    protected long scannerId = -1L;
72    protected boolean instantiated = false;
73    protected boolean closed = false;
74    private Scan scan;
75    private int caching = 1;
76    protected final ClusterConnection cConnection;
77    protected ScanMetrics scanMetrics;
78    private boolean logScannerActivity = false;
79    private int logCutOffLatency = 1000;
80    private static String myAddress;
81    protected final int id;
82    protected boolean serverHasMoreResultsContext;
83    protected boolean serverHasMoreResults;
84  
85    /**
86     * Saves whether or not the most recent response from the server was a heartbeat message.
87     * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
88     */
89    protected boolean heartbeatMessage = false;
90    static {
91      try {
92        myAddress = DNS.getDefaultHost("default", "default");
93      } catch (UnknownHostException uhe) {
94        LOG.error("cannot determine my address", uhe);
95      }
96    }
97  
98    // indicate if it is a remote server call
99    protected boolean isRegionServerRemote = true;
100   private long nextCallSeq = 0;
101   protected RpcControllerFactory controllerFactory;
102   protected PayloadCarryingRpcController controller;
103 
104   /**
105    * @param connection which connection
106    * @param tableName table callable is on
107    * @param scan the scan to execute
108    * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
109    *          metrics
110    * @param rpcControllerFactory factory to use when creating 
111    *        {@link com.google.protobuf.RpcController}
112    */
113   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
114       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
115     this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
116   }
117   /**
118    *
119    * @param connection
120    * @param tableName
121    * @param scan
122    * @param scanMetrics
123    * @param id the replicaId
124    */
125   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
126       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
127     super(connection, tableName, scan.getStartRow());
128     this.id = id;
129     this.cConnection = connection;
130     this.scan = scan;
131     this.scanMetrics = scanMetrics;
132     Configuration conf = connection.getConfiguration();
133     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
134     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
135     this.controllerFactory = rpcControllerFactory;
136   }
137 
138   PayloadCarryingRpcController getController() {
139     return controller;
140   }
141 
142   /**
143    * @param reload force reload of server location
144    * @throws IOException
145    */
146   @Override
147   public void prepare(boolean reload) throws IOException {
148     if (Thread.interrupted()) {
149       throw new InterruptedIOException();
150     }
151     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
152         id, getConnection(), getTableName(), getRow());
153     location = id < rl.size() ? rl.getRegionLocation(id) : null;
154     if (location == null || location.getServerName() == null) {
155       // With this exception, there will be a retry. The location can be null for a replica
156       //  when the table is created or after a split.
157       throw new HBaseIOException("There is no location for replica id #" + id);
158     }
159     ServerName dest = location.getServerName();
160     setStub(super.getConnection().getClient(dest));
161     if (!instantiated || reload) {
162       checkIfRegionServerIsRemote();
163       instantiated = true;
164     }
165 
166     // check how often we retry.
167     if (reload && this.scanMetrics != null) {
168       this.scanMetrics.countOfRPCRetries.incrementAndGet();
169       if (isRegionServerRemote) {
170         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
171       }
172     }
173   }
174 
175   /**
176    * compare the local machine hostname with region server's hostname
177    * to decide if hbase client connects to a remote region server
178    */
179   protected void checkIfRegionServerIsRemote() {
180     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
181       isRegionServerRemote = false;
182     } else {
183       isRegionServerRemote = true;
184     }
185   }
186 
187 
188   @Override
189   public Result [] call(int callTimeout) throws IOException {
190     if (Thread.interrupted()) {
191       throw new InterruptedIOException();
192     }
193     if (closed) {
194       if (scannerId != -1) {
195         close();
196       }
197     } else {
198       if (scannerId == -1L) {
199         this.scannerId = openScanner();
200       } else {
201         Result [] rrs = null;
202         ScanRequest request = null;
203         // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
204         setHeartbeatMessage(false);
205         try {
206           incRPCcallsMetrics();
207           request =
208               RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
209                 this.scanMetrics != null);
210           ScanResponse response = null;
211           controller = controllerFactory.newController();
212           controller.setPriority(getTableName());
213           controller.setCallTimeout(callTimeout);
214           try {
215             response = getStub().scan(controller, request);
216             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
217             // from client to server will increment this number in both sides. Client passes this
218             // number along with the request and at RS side both the incoming nextCallSeq and its
219             // nextCallSeq will be matched. In case of a timeout this increment at the client side
220             // should not happen. If at the server side fetching of next batch of data was over,
221             // there will be mismatch in the nextCallSeq number. Server will throw
222             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
223             // as the last successfully retrieved row.
224             // See HBASE-5974
225             nextCallSeq++;
226             long timestamp = System.currentTimeMillis();
227             setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
228             // Results are returned via controller
229             CellScanner cellScanner = controller.cellScanner();
230             rrs = ResponseConverter.getResults(cellScanner, response);
231             if (logScannerActivity) {
232               long now = System.currentTimeMillis();
233               if (now - timestamp > logCutOffLatency) {
234                 int rows = rrs == null ? 0 : rrs.length;
235                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
236                   + rows + " rows from scanner=" + scannerId);
237               }
238             }
239             updateServerSideMetrics(response);
240             // moreResults is only used for the case where a filter exhausts all elements
241             if (response.hasMoreResults() && !response.getMoreResults()) {
242               scannerId = -1L;
243               closed = true;
244               // Implied that no results were returned back, either.
245               return null;
246             }
247             // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
248             // to size or quantity of results in the response.
249             if (response.hasMoreResultsInRegion()) {
250               // Set what the RS said
251               setHasMoreResultsContext(true);
252               setServerHasMoreResults(response.getMoreResultsInRegion());
253             } else {
254               // Server didn't respond whether it has more results or not.
255               setHasMoreResultsContext(false);
256             }
257           } catch (ServiceException se) {
258             throw ProtobufUtil.getRemoteException(se);
259           }
260           updateResultsMetrics(rrs);
261         } catch (IOException e) {
262           if (logScannerActivity) {
263             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
264               + " to " + getLocation(), e);
265           }
266           IOException ioe = e;
267           if (e instanceof RemoteException) {
268             ioe = ((RemoteException) e).unwrapRemoteException();
269           }
270           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
271             try {
272               HRegionLocation location =
273                 getConnection().relocateRegion(getTableName(), scan.getStartRow());
274               LOG.info("Scanner=" + scannerId
275                 + " expired, current region location is " + location.toString());
276             } catch (Throwable t) {
277               LOG.info("Failed to relocate region", t);
278             }
279           }
280           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
281           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
282           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
283           // a scan when doing a next in particular, we want to break out and get the scanner to
284           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
285           // yeah and hard to follow and in need of a refactor).
286           if (ioe instanceof NotServingRegionException) {
287             // Throw a DNRE so that we break out of cycle of calling NSRE
288             // when what we need is to open scanner against new location.
289             // Attach NSRE to signal client that it needs to re-setup scanner.
290             if (this.scanMetrics != null) {
291               this.scanMetrics.countOfNSRE.incrementAndGet();
292             }
293             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
294           } else if (ioe instanceof RegionServerStoppedException) {
295             // Throw a DNRE so that we break out of cycle of the retries and instead go and
296             // open scanner against new location.
297             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
298           } else {
299             // The outer layers will retry
300             throw ioe;
301           }
302         }
303         return rrs;
304       }
305     }
306     return null;
307   }
308 
309   /**
310    * @return true when the most recent RPC response indicated that the response was a heartbeat
311    *         message. Heartbeat messages are sent back from the server when the processing of the
312    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
313    *         timeouts during long running scan operations.
314    */
315   protected boolean isHeartbeatMessage() {
316     return heartbeatMessage;
317   }
318 
319   protected void setHeartbeatMessage(boolean heartbeatMessage) {
320     this.heartbeatMessage = heartbeatMessage;
321   }
322 
323   private void incRPCcallsMetrics() {
324     if (this.scanMetrics == null) {
325       return;
326     }
327     this.scanMetrics.countOfRPCcalls.incrementAndGet();
328     if (isRegionServerRemote) {
329       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
330     }
331   }
332 
333   protected void updateResultsMetrics(Result[] rrs) {
334     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
335       return;
336     }
337     long resultSize = 0;
338     for (Result rr : rrs) {
339       for (Cell cell : rr.rawCells()) {
340         resultSize += CellUtil.estimatedSerializedSizeOf(cell);
341       }
342     }
343     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
344     if (isRegionServerRemote) {
345       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
346     }
347   }
348 
349   /**
350    * Use the scan metrics returned by the server to add to the identically named counters in the
351    * client side metrics. If a counter does not exist with the same name as the server side metric,
352    * the attempt to increase the counter will fail.
353    * @param response
354    */
355   private void updateServerSideMetrics(ScanResponse response) {
356     if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
357 
358     Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
359     for (Entry<String, Long> entry : serverMetrics.entrySet()) {
360       this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
361     }
362   }
363 
364   private void close() {
365     if (this.scannerId == -1L) {
366       return;
367     }
368     try {
369       incRPCcallsMetrics();
370       ScanRequest request =
371           RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
372       try {
373         getStub().scan(null, request);
374       } catch (ServiceException se) {
375         throw ProtobufUtil.getRemoteException(se);
376       }
377     } catch (IOException e) {
378       LOG.warn("Ignore, probably already closed", e);
379     }
380     this.scannerId = -1L;
381   }
382 
383   protected long openScanner() throws IOException {
384     incRPCcallsMetrics();
385     ScanRequest request =
386       RequestConverter.buildScanRequest(
387         getLocation().getRegionInfo().getRegionName(),
388         this.scan, 0, false);
389     try {
390       ScanResponse response = getStub().scan(null, request);
391       long id = response.getScannerId();
392       if (logScannerActivity) {
393         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
394           + " on region " + getLocation().toString());
395       }
396       return id;
397     } catch (ServiceException se) {
398       throw ProtobufUtil.getRemoteException(se);
399     }
400   }
401 
402   protected Scan getScan() {
403     return scan;
404   }
405 
406   /**
407    * Call this when the next invocation of call should close the scanner
408    */
409   public void setClose() {
410     this.closed = true;
411   }
412 
413   /**
414    * @return the HRegionInfo for the current region
415    */
416   @Override
417   public HRegionInfo getHRegionInfo() {
418     if (!instantiated) {
419       return null;
420     }
421     return getLocation().getRegionInfo();
422   }
423 
424   /**
425    * Get the number of rows that will be fetched on next
426    * @return the number of rows for caching
427    */
428   public int getCaching() {
429     return caching;
430   }
431 
432   @Override
433   public ClusterConnection getConnection() {
434     return cConnection;
435   }
436 
437   /**
438    * Set the number of rows that will be fetched on next
439    * @param caching the number of rows for caching
440    */
441   public void setCaching(int caching) {
442     this.caching = caching;
443   }
444 
445   public ScannerCallable getScannerCallableForReplica(int id) {
446     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
447         this.getScan(), this.scanMetrics, controllerFactory, id);
448     s.setCaching(this.caching);
449     return s;
450   }
451 
452   /**
453    * Should the client attempt to fetch more results from this region
454    * @return True if the client should attempt to fetch more results, false otherwise.
455    */
456   protected boolean getServerHasMoreResults() {
457     assert serverHasMoreResultsContext;
458     return this.serverHasMoreResults;
459   }
460 
461   protected void setServerHasMoreResults(boolean serverHasMoreResults) {
462     this.serverHasMoreResults = serverHasMoreResults;
463   }
464 
465   /**
466    * Did the server respond with information about whether more results might exist.
467    * Not guaranteed to respond with older server versions
468    * @return True if the server responded with information about more results.
469    */
470   protected boolean hasMoreResultsContext() {
471     return serverHasMoreResultsContext;
472   }
473 
474   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
475     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
476   }
477 }