View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.UnknownHostException;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellUtil;
32  import org.apache.hadoop.hbase.DoNotRetryIOException;
33  import org.apache.hadoop.hbase.HBaseIOException;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.HRegionLocation;
36  import org.apache.hadoop.hbase.NotServingRegionException;
37  import org.apache.hadoop.hbase.RegionLocations;
38  import org.apache.hadoop.hbase.ServerName;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.UnknownScannerException;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
43  import org.apache.hadoop.hbase.ipc.HBaseRpcController;
44  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
45  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
46  import org.apache.hadoop.hbase.protobuf.RequestConverter;
47  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
48  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
49  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
50  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
51  import org.apache.hadoop.ipc.RemoteException;
52  import org.apache.hadoop.net.DNS;
53
54  /**
55   * Scanner operations such as create, next, etc.
56   * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
57   * {@link RpcRetryingCaller} so fails are retried.
58   */
59  @InterfaceAudience.Private
60  public class ScannerCallable extends RegionServerCallable<Result[]> {
61    public static final String LOG_SCANNER_LATENCY_CUTOFF
62      = "hbase.client.log.scanner.latency.cutoff";
63    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
64
65    // Keeping LOG public as it is being used in TestScannerHeartbeatMessages
66    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
67    protected long scannerId = -1L;
68    protected boolean instantiated = false;
69    protected boolean closed = false;
70    protected boolean renew = false;
71    private Scan scan;
72    private int caching = 1;
73    protected ScanMetrics scanMetrics;
74    private boolean logScannerActivity = false;
75    private int logCutOffLatency = 1000;
76    private static String myAddress;
77    protected final int id;
78    protected boolean serverHasMoreResultsContext;
79    protected boolean serverHasMoreResults;
80
81    /**
82     * Saves whether or not the most recent response from the server was a heartbeat message.
83     * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
84     */
85    protected boolean heartbeatMessage = false;
86    static {
87      try {
88        myAddress = DNS.getDefaultHost("default", "default");
89      } catch (UnknownHostException uhe) {
90        LOG.error("cannot determine my address", uhe);
91      }
92    }
93
94    // indicate if it is a remote server call
95    protected boolean isRegionServerRemote = true;
96    private long nextCallSeq = 0;
97    protected final RpcControllerFactory rpcControllerFactory;
98  
99    /**
100    * @param connection which connection
101    * @param tableName table callable is on
102    * @param scan the scan to execute
103    * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
104    *          metrics
105    * @param rpcControllerFactory factory to use when creating 
106    *        {@link com.google.protobuf.RpcController}
107    */
108   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
109       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
110     this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
111   }
112   /**
113    *
114    * @param connection
115    * @param tableName
116    * @param scan
117    * @param scanMetrics
118    * @param id the replicaId
119    */
120   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
121       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
122     super(connection, rpcControllerFactory, tableName, scan.getStartRow());
123     this.id = id;
124     this.scan = scan;
125     this.scanMetrics = scanMetrics;
126     Configuration conf = connection.getConfiguration();
127     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
128     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
129     this.rpcControllerFactory = rpcControllerFactory;
130   }
131
132   /**
133    * @param reload force reload of server location
134    * @throws IOException
135    */
136   @Override
137   public void prepare(boolean reload) throws IOException {
138     if (Thread.interrupted()) {
139       throw new InterruptedIOException();
140     }
141     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
142         id, getConnection(), getTableName(), getRow());
143     location = id < rl.size() ? rl.getRegionLocation(id) : null;
144     if (location == null || location.getServerName() == null) {
145       // With this exception, there will be a retry. The location can be null for a replica
146       //  when the table is created or after a split.
147       throw new HBaseIOException("There is no location for replica id #" + id);
148     }
149     ServerName dest = location.getServerName();
150     setStub(super.getConnection().getClient(dest));
151     if (!instantiated || reload) {
152       checkIfRegionServerIsRemote();
153       instantiated = true;
154     }
155
156     // check how often we retry.
157     if (reload && this.scanMetrics != null) {
158       this.scanMetrics.countOfRPCRetries.incrementAndGet();
159       if (isRegionServerRemote) {
160         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
161       }
162     }
163   }
164
165   /**
166    * compare the local machine hostname with region server's hostname
167    * to decide if hbase client connects to a remote region server
168    */
169   protected void checkIfRegionServerIsRemote() {
170     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
171       isRegionServerRemote = false;
172     } else {
173       isRegionServerRemote = true;
174     }
175   }
176
177   protected Result [] rpcCall() throws Exception {
178     if (Thread.interrupted()) {
179       throw new InterruptedIOException();
180     }
181     if (this.closed) {
182       if (this.scannerId != -1) {
183         close();
184       }
185     } else {
186       if (this.scannerId == -1L) {
187         this.scannerId = openScanner();
188       } else {
189         Result [] rrs = null;
190         ScanRequest request = null;
191         // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
192         setHeartbeatMessage(false);
193         try {
194           incRPCcallsMetrics();
195           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
196                 this.scanMetrics != null, renew);
197           ScanResponse response = null;
198           response = getStub().scan(getRpcController(), request);
199           // Client and RS maintain a nextCallSeq number during the scan. Every next() call
200           // from client to server will increment this number in both sides. Client passes this
201           // number along with the request and at RS side both the incoming nextCallSeq and its
202           // nextCallSeq will be matched. In case of a timeout this increment at the client side
203           // should not happen. If at the server side fetching of next batch of data was over,
204           // there will be mismatch in the nextCallSeq number. Server will throw
205           // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
206           // as the last successfully retrieved row.
207           // See HBASE-5974
208           nextCallSeq++;
209           long timestamp = System.currentTimeMillis();
210           setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
211           rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
212           if (logScannerActivity) {
213             long now = System.currentTimeMillis();
214             if (now - timestamp > logCutOffLatency) {
215               int rows = rrs == null ? 0 : rrs.length;
216               LOG.info("Took " + (now-timestamp) + "ms to fetch "
217                   + rows + " rows from scanner=" + scannerId);
218             }
219           }
220           updateServerSideMetrics(response);
221           // moreResults is only used for the case where a filter exhausts all elements
222           if (response.hasMoreResults() && !response.getMoreResults()) {
223             this.scannerId = -1L;
224             this.closed = true;
225             // Implied that no results were returned back, either.
226             return null;
227           }
228           // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
229           // to size or quantity of results in the response.
230           if (response.hasMoreResultsInRegion()) {
231             // Set what the RS said
232             setHasMoreResultsContext(true);
233             setServerHasMoreResults(response.getMoreResultsInRegion());
234           } else {
235             // Server didn't respond whether it has more results or not.
236             setHasMoreResultsContext(false);
237           }
238           updateResultsMetrics(rrs);
239         } catch (IOException e) {
240           if (logScannerActivity) {
241             LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " +
242                 getLocation(), e);
243           }
244           IOException ioe = e;
245           if (e instanceof RemoteException) {
246             ioe = ((RemoteException) e).unwrapRemoteException();
247           }
248           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
249             try {
250               HRegionLocation location =
251                   getConnection().relocateRegion(getTableName(), scan.getStartRow());
252               LOG.info("Scanner=" + scannerId + " expired, current region location is " +
253                   location.toString());
254             } catch (Throwable t) {
255               LOG.info("Failed to relocate region", t);
256             }
257           }
258           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
259           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
260           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
261           // a scan when doing a next in particular, we want to break out and get the scanner to
262           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
263           // yeah and hard to follow and in need of a refactor).
264           if (ioe instanceof NotServingRegionException) {
265             // Throw a DNRE so that we break out of cycle of calling NSRE
266             // when what we need is to open scanner against new location.
267             // Attach NSRE to signal client that it needs to re-setup scanner.
268             if (this.scanMetrics != null) {
269               this.scanMetrics.countOfNSRE.incrementAndGet();
270             }
271             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
272           } else if (ioe instanceof RegionServerStoppedException) {
273             // Throw a DNRE so that we break out of cycle of the retries and instead go and
274             // open scanner against new location.
275             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
276           } else {
277             // The outer layers will retry
278             throw ioe;
279           }
280         }
281         return rrs;
282       }
283     }
284     return null;
285   }
286
287   /**
288    * @return true when the most recent RPC response indicated that the response was a heartbeat
289    *         message. Heartbeat messages are sent back from the server when the processing of the
290    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
291    *         timeouts during long running scan operations.
292    */
293   protected boolean isHeartbeatMessage() {
294     return heartbeatMessage;
295   }
296
297   protected void setHeartbeatMessage(boolean heartbeatMessage) {
298     this.heartbeatMessage = heartbeatMessage;
299   }
300
301   private void incRPCcallsMetrics() {
302     if (this.scanMetrics == null) {
303       return;
304     }
305     this.scanMetrics.countOfRPCcalls.incrementAndGet();
306     if (isRegionServerRemote) {
307       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
308     }
309   }
310
311   protected void updateResultsMetrics(Result[] rrs) {
312     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
313       return;
314     }
315     long resultSize = 0;
316     for (Result rr : rrs) {
317       for (Cell cell : rr.rawCells()) {
318         resultSize += CellUtil.estimatedSerializedSizeOf(cell);
319       }
320     }
321     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
322     if (isRegionServerRemote) {
323       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
324     }
325   }
326
327   /**
328    * Use the scan metrics returned by the server to add to the identically named counters in the
329    * client side metrics. If a counter does not exist with the same name as the server side metric,
330    * the attempt to increase the counter will fail.
331    * @param response
332    */
333   private void updateServerSideMetrics(ScanResponse response) {
334     if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
335
336     Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
337     for (Entry<String, Long> entry : serverMetrics.entrySet()) {
338       this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
339     }
340   }
341
342   private void close() {
343     if (this.scannerId == -1L) {
344       return;
345     }
346     try {
347       incRPCcallsMetrics();
348       ScanRequest request =
349           RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
350       try {
351         getStub().scan(getRpcController(), request);
352       } catch (Exception e) {
353         throw ProtobufUtil.handleRemoteException(e);
354       }
355     } catch (IOException e) {
356       LOG.warn("Ignore, probably already closed", e);
357     }
358     this.scannerId = -1L;
359   }
360
361   protected long openScanner() throws IOException {
362     incRPCcallsMetrics();
363     ScanRequest request = RequestConverter.buildScanRequest(
364         getLocation().getRegionInfo().getRegionName(), this.scan, 0, false);
365     try {
366       ScanResponse response = getStub().scan(getRpcController(), request);
367       long id = response.getScannerId();
368       if (logScannerActivity) {
369         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
370           + " on region " + getLocation().toString());
371       }
372       return id;
373     } catch (Exception e) {
374       throw ProtobufUtil.handleRemoteException(e);
375     }
376   }
377
378   protected Scan getScan() {
379     return scan;
380   }
381
382   /**
383    * Call this when the next invocation of call should close the scanner
384    */
385   public void setClose() {
386     this.closed = true;
387   }
388
389   /**
390    * Indicate whether we make a call only to renew the lease, but without affected the scanner in
391    * any other way.
392    * @param val true if only the lease should be renewed
393    */
394   public void setRenew(boolean val) {
395     this.renew = val;
396   }
397
398   /**
399    * @return the HRegionInfo for the current region
400    */
401   @Override
402   public HRegionInfo getHRegionInfo() {
403     if (!instantiated) {
404       return null;
405     }
406     return getLocation().getRegionInfo();
407   }
408
409   /**
410    * Get the number of rows that will be fetched on next
411    * @return the number of rows for caching
412    */
413   public int getCaching() {
414     return caching;
415   }
416
417   /**
418    * Set the number of rows that will be fetched on next
419    * @param caching the number of rows for caching
420    */
421   public void setCaching(int caching) {
422     this.caching = caching;
423   }
424
425   public ScannerCallable getScannerCallableForReplica(int id) {
426     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
427         this.getScan(), this.scanMetrics, this.rpcControllerFactory, id);
428     s.setCaching(this.caching);
429     return s;
430   }
431
432   /**
433    * Should the client attempt to fetch more results from this region
434    * @return True if the client should attempt to fetch more results, false otherwise.
435    */
436   protected boolean getServerHasMoreResults() {
437     assert serverHasMoreResultsContext;
438     return this.serverHasMoreResults;
439   }
440
441   protected void setServerHasMoreResults(boolean serverHasMoreResults) {
442     this.serverHasMoreResults = serverHasMoreResults;
443   }
444
445   /**
446    * Did the server respond with information about whether more results might exist.
447    * Not guaranteed to respond with older server versions
448    * @return True if the server responded with information about more results.
449    */
450   protected boolean hasMoreResultsContext() {
451     return serverHasMoreResultsContext;
452   }
453
454   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
455     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
456   }
457 }