View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.net.UnknownHostException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.DoNotRetryIOException;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.HRegionLocation;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.NotServingRegionException;
35  import org.apache.hadoop.hbase.RemoteExceptionHandler;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.UnknownScannerException;
38  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
39  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
40  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
41  import org.apache.hadoop.hbase.protobuf.RequestConverter;
42  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
43  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
44  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
45  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
46  import org.apache.hadoop.ipc.RemoteException;
47  import org.apache.hadoop.net.DNS;
48  
49  import com.google.protobuf.ServiceException;
50  import com.google.protobuf.TextFormat;
51  
52  /**
53   * Scanner operations such as create, next, etc.
54   * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
55   * {@link RpcRetryingCaller} so fails are retried.
56   */
57  @InterfaceAudience.Private
58  public class ScannerCallable extends RegionServerCallable<Result[]> {
59    public static final String LOG_SCANNER_LATENCY_CUTOFF
60      = "hbase.client.log.scanner.latency.cutoff";
61    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
62  
63    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
64    private long scannerId = -1L;
65    protected boolean instantiated = false;
66    private boolean closed = false;
67    private Scan scan;
68    private int caching = 1;
69    protected ScanMetrics scanMetrics;
70    private boolean logScannerActivity = false;
71    private int logCutOffLatency = 1000;
72    private static String myAddress;
73    static {
74      try {
75        myAddress = DNS.getDefaultHost("default", "default");
76      } catch (UnknownHostException uhe) {
77        LOG.error("cannot determine my address", uhe);
78      }
79    }
80  
81    // indicate if it is a remote server call
82    protected boolean isRegionServerRemote = true;
83    private long nextCallSeq = 0;
84    
85    /**
86     * @param connection which connection
87     * @param tableName table callable is on
88     * @param scan the scan to execute
89     * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable
90     * won't collect metrics
91     */
92    public ScannerCallable (HConnection connection, TableName tableName, Scan scan,
93      ScanMetrics scanMetrics) {
94      super(connection, tableName, scan.getStartRow());
95      this.scan = scan;
96      this.scanMetrics = scanMetrics;
97      Configuration conf = connection.getConfiguration();
98      logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
99      logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
100   }
101 
102   /**
103    * @deprecated Use {@link #ScannerCallable(HConnection, TableName, Scan, ScanMetrics)}
104    */
105   @Deprecated
106   public ScannerCallable (HConnection connection, final byte [] tableName, Scan scan,
107       ScanMetrics scanMetrics) {
108     this(connection, TableName.valueOf(tableName), scan, scanMetrics);
109   }
110 
111   /**
112    * @param reload force reload of server location
113    * @throws IOException
114    */
115   @Override
116   public void prepare(boolean reload) throws IOException {
117     if (!instantiated || reload) {
118       super.prepare(reload);
119       checkIfRegionServerIsRemote();
120       instantiated = true;
121     }
122 
123     // check how often we retry.
124     // HConnectionManager will call instantiateServer with reload==true
125     // if and only if for retries.
126     if (reload && this.scanMetrics != null) {
127       this.scanMetrics.countOfRPCRetries.incrementAndGet();
128       if (isRegionServerRemote) {
129         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
130       }
131     }
132   }
133 
134   /**
135    * compare the local machine hostname with region server's hostname
136    * to decide if hbase client connects to a remote region server
137    */
138   protected void checkIfRegionServerIsRemote() {
139     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
140       isRegionServerRemote = false;
141     } else {
142       isRegionServerRemote = true;
143     }
144   }
145 
146 
147   @Override
148   @SuppressWarnings("deprecation")
149   public Result [] call(int callTimeout) throws IOException {
150     if (closed) {
151       if (scannerId != -1) {
152         close();
153       }
154     } else {
155       if (scannerId == -1L) {
156         this.scannerId = openScanner();
157       } else {
158         Result [] rrs = null;
159         ScanRequest request = null;
160         try {
161           incRPCcallsMetrics();
162           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
163           ScanResponse response = null;
164           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
165           controller.setPriority(getTableName());
166           controller.setCallTimeout(callTimeout);
167           try {
168             response = getStub().scan(controller, request);
169             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
170             // from client to server will increment this number in both sides. Client passes this
171             // number along with the request and at RS side both the incoming nextCallSeq and its
172             // nextCallSeq will be matched. In case of a timeout this increment at the client side
173             // should not happen. If at the server side fetching of next batch of data was over,
174             // there will be mismatch in the nextCallSeq number. Server will throw
175             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
176             // as the last successfully retrieved row.
177             // See HBASE-5974
178             nextCallSeq++;
179             long timestamp = System.currentTimeMillis();
180             // Results are returned via controller
181             CellScanner cellScanner = controller.cellScanner();
182             rrs = ResponseConverter.getResults(cellScanner, response);
183             if (logScannerActivity) {
184               long now = System.currentTimeMillis();
185               if (now - timestamp > logCutOffLatency) {
186                 int rows = rrs == null ? 0 : rrs.length;
187                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
188                   + rows + " rows from scanner=" + scannerId);
189               }
190             }
191             if (response.hasMoreResults()
192                 && !response.getMoreResults()) {
193               scannerId = -1L;
194               closed = true;
195               return null;
196             }
197           } catch (ServiceException se) {
198             throw ProtobufUtil.getRemoteException(se);
199           }
200           updateResultsMetrics(rrs);
201         } catch (IOException e) {
202           if (logScannerActivity) {
203             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
204               + " to " + getLocation(), e);
205           }
206           IOException ioe = e;
207           if (e instanceof RemoteException) {
208             ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
209           }
210           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
211             try {
212               HRegionLocation location =
213                 getConnection().relocateRegion(getTableName(), scan.getStartRow());
214               LOG.info("Scanner=" + scannerId
215                 + " expired, current region location is " + location.toString());
216             } catch (Throwable t) {
217               LOG.info("Failed to relocate region", t);
218             }
219           }
220           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
221           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
222           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
223           // a scan when doing a next in particular, we want to break out and get the scanner to
224           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
225           // yeah and hard to follow and in need of a refactor).
226           if (ioe instanceof NotServingRegionException) {
227             // Throw a DNRE so that we break out of cycle of calling NSRE
228             // when what we need is to open scanner against new location.
229             // Attach NSRE to signal client that it needs to re-setup scanner.
230             if (this.scanMetrics != null) {
231               this.scanMetrics.countOfNSRE.incrementAndGet();
232             }
233             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
234           } else if (ioe instanceof RegionServerStoppedException) {
235             // Throw a DNRE so that we break out of cycle of the retries and instead go and
236             // open scanner against new location.
237             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
238           } else {
239             // The outer layers will retry
240             throw ioe;
241           }
242         }
243         return rrs;
244       }
245     }
246     return null;
247   }
248 
249   private void incRPCcallsMetrics() {
250     if (this.scanMetrics == null) {
251       return;
252     }
253     this.scanMetrics.countOfRPCcalls.incrementAndGet();
254     if (isRegionServerRemote) {
255       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
256     }
257   }
258 
259   private void updateResultsMetrics(Result[] rrs) {
260     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
261       return;
262     }
263     long resultSize = 0;
264     for (Result rr : rrs) {
265       for (Cell kv : rr.rawCells()) {
266         // TODO add getLength to Cell/use CellUtil#estimatedSizeOf
267         resultSize += KeyValueUtil.ensureKeyValue(kv).getLength();
268       }
269     }
270     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
271     if (isRegionServerRemote) {
272       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
273     }
274   }
275 
276   private void close() {
277     if (this.scannerId == -1L) {
278       return;
279     }
280     try {
281       incRPCcallsMetrics();
282       ScanRequest request =
283         RequestConverter.buildScanRequest(this.scannerId, 0, true);
284       try {
285         getStub().scan(null, request);
286       } catch (ServiceException se) {
287         throw ProtobufUtil.getRemoteException(se);
288       }
289     } catch (IOException e) {
290       LOG.warn("Ignore, probably already closed", e);
291     }
292     this.scannerId = -1L;
293   }
294 
295   protected long openScanner() throws IOException {
296     incRPCcallsMetrics();
297     ScanRequest request =
298       RequestConverter.buildScanRequest(
299         getLocation().getRegionInfo().getRegionName(),
300         this.scan, 0, false);
301     try {
302       ScanResponse response = getStub().scan(null, request);
303       long id = response.getScannerId();
304       if (logScannerActivity) {
305         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
306           + " on region " + getLocation().toString());
307       }
308       return id;
309     } catch (ServiceException se) {
310       throw ProtobufUtil.getRemoteException(se);
311     }
312   }
313 
314   protected Scan getScan() {
315     return scan;
316   }
317 
318   /**
319    * Call this when the next invocation of call should close the scanner
320    */
321   public void setClose() {
322     this.closed = true;
323   }
324 
325   /**
326    * @return the HRegionInfo for the current region
327    */
328   public HRegionInfo getHRegionInfo() {
329     if (!instantiated) {
330       return null;
331     }
332     return getLocation().getRegionInfo();
333   }
334 
335   /**
336    * Get the number of rows that will be fetched on next
337    * @return the number of rows for caching
338    */
339   public int getCaching() {
340     return caching;
341   }
342 
343   /**
344    * Set the number of rows that will be fetched on next
345    * @param caching the number of rows for caching
346    */
347   public void setCaching(int caching) {
348     this.caching = caching;
349   }
350 }