View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.net.UnknownHostException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.DoNotRetryIOException;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.HRegionLocation;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.NotServingRegionException;
35  import org.apache.hadoop.hbase.RemoteExceptionHandler;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.UnknownScannerException;
38  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
39  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
40  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
41  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
42  import org.apache.hadoop.hbase.protobuf.RequestConverter;
43  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
44  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
45  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
46  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
47  import org.apache.hadoop.ipc.RemoteException;
48  import org.apache.hadoop.net.DNS;
49  
50  import com.google.protobuf.RpcController;
51  import com.google.protobuf.ServiceException;
52  import com.google.protobuf.TextFormat;
53  
54  /**
55   * Scanner operations such as create, next, etc.
56   * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
57   * {@link RpcRetryingCaller} so fails are retried.
58   */
59  @InterfaceAudience.Private
60  public class ScannerCallable extends RegionServerCallable<Result[]> {
61    public static final String LOG_SCANNER_LATENCY_CUTOFF
62      = "hbase.client.log.scanner.latency.cutoff";
63    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
64  
65    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
66    private long scannerId = -1L;
67    protected boolean instantiated = false;
68    private boolean closed = false;
69    private Scan scan;
70    private int caching = 1;
71    protected ScanMetrics scanMetrics;
72    private boolean logScannerActivity = false;
73    private int logCutOffLatency = 1000;
74    private static String myAddress;
75    static {
76      try {
77        myAddress = DNS.getDefaultHost("default", "default");
78      } catch (UnknownHostException uhe) {
79        LOG.error("cannot determine my address", uhe);
80      }
81    }
82  
83    // indicate if it is a remote server call
84    protected boolean isRegionServerRemote = true;
85    private long nextCallSeq = 0;
86    private RpcControllerFactory rpcFactory;
87    
88    /**
89     * @param connection which connection
90     * @param tableName table callable is on
91     * @param scan the scan to execute
92     * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
93     *          metrics
94     * @param rpcControllerFactory factory to use when creating {@link RpcController}
95     */
96    public ScannerCallable (HConnection connection, TableName tableName, Scan scan,
97        ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
98      super(connection, tableName, scan.getStartRow());
99      this.scan = scan;
100     this.scanMetrics = scanMetrics;
101     Configuration conf = connection.getConfiguration();
102     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
103     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
104     this.rpcFactory = rpcControllerFactory;
105   }
106 
107   /**
108    * @deprecated Use {@link #ScannerCallable(HConnection, TableName, Scan, ScanMetrics,
109    *                 RpcControllerFactory)}
110    */
111   @Deprecated
112   public ScannerCallable (HConnection connection, final byte [] tableName, Scan scan,
113       ScanMetrics scanMetrics) {
114     this(connection, TableName.valueOf(tableName), scan, scanMetrics, RpcControllerFactory
115         .instantiate(connection.getConfiguration()));
116   }
117 
118   /**
119    * @param reload force reload of server location
120    * @throws IOException
121    */
122   @Override
123   public void prepare(boolean reload) throws IOException {
124     if (!instantiated || reload) {
125       super.prepare(reload);
126       checkIfRegionServerIsRemote();
127       instantiated = true;
128     }
129 
130     // check how often we retry.
131     // HConnectionManager will call instantiateServer with reload==true
132     // if and only if for retries.
133     if (reload && this.scanMetrics != null) {
134       this.scanMetrics.countOfRPCRetries.incrementAndGet();
135       if (isRegionServerRemote) {
136         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
137       }
138     }
139   }
140 
141   /**
142    * compare the local machine hostname with region server's hostname
143    * to decide if hbase client connects to a remote region server
144    */
145   protected void checkIfRegionServerIsRemote() {
146     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
147       isRegionServerRemote = false;
148     } else {
149       isRegionServerRemote = true;
150     }
151   }
152 
153 
154   @Override
155   @SuppressWarnings("deprecation")
156   public Result [] call(int callTimeout) throws IOException {
157     if (closed) {
158       if (scannerId != -1) {
159         close();
160       }
161     } else {
162       if (scannerId == -1L) {
163         this.scannerId = openScanner();
164       } else {
165         Result [] rrs = null;
166         ScanRequest request = null;
167         try {
168           incRPCcallsMetrics();
169           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
170           ScanResponse response = null;
171           PayloadCarryingRpcController controller = rpcFactory.newController();
172           controller.setPriority(getTableName());
173           controller.setCallTimeout(callTimeout);
174           try {
175             response = getStub().scan(controller, request);
176             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
177             // from client to server will increment this number in both sides. Client passes this
178             // number along with the request and at RS side both the incoming nextCallSeq and its
179             // nextCallSeq will be matched. In case of a timeout this increment at the client side
180             // should not happen. If at the server side fetching of next batch of data was over,
181             // there will be mismatch in the nextCallSeq number. Server will throw
182             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
183             // as the last successfully retrieved row.
184             // See HBASE-5974
185             nextCallSeq++;
186             long timestamp = System.currentTimeMillis();
187             // Results are returned via controller
188             CellScanner cellScanner = controller.cellScanner();
189             rrs = ResponseConverter.getResults(cellScanner, response);
190             if (logScannerActivity) {
191               long now = System.currentTimeMillis();
192               if (now - timestamp > logCutOffLatency) {
193                 int rows = rrs == null ? 0 : rrs.length;
194                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
195                   + rows + " rows from scanner=" + scannerId);
196               }
197             }
198             if (response.hasMoreResults()
199                 && !response.getMoreResults()) {
200               scannerId = -1L;
201               closed = true;
202               return null;
203             }
204           } catch (ServiceException se) {
205             throw ProtobufUtil.getRemoteException(se);
206           }
207           updateResultsMetrics(rrs);
208         } catch (IOException e) {
209           if (logScannerActivity) {
210             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
211               + " to " + getLocation(), e);
212           }
213           IOException ioe = e;
214           if (e instanceof RemoteException) {
215             ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
216           }
217           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
218             try {
219               HRegionLocation location =
220                 getConnection().relocateRegion(getTableName(), scan.getStartRow());
221               LOG.info("Scanner=" + scannerId
222                 + " expired, current region location is " + location.toString());
223             } catch (Throwable t) {
224               LOG.info("Failed to relocate region", t);
225             }
226           }
227           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
228           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
229           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
230           // a scan when doing a next in particular, we want to break out and get the scanner to
231           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
232           // yeah and hard to follow and in need of a refactor).
233           if (ioe instanceof NotServingRegionException) {
234             // Throw a DNRE so that we break out of cycle of calling NSRE
235             // when what we need is to open scanner against new location.
236             // Attach NSRE to signal client that it needs to re-setup scanner.
237             if (this.scanMetrics != null) {
238               this.scanMetrics.countOfNSRE.incrementAndGet();
239             }
240             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
241           } else if (ioe instanceof RegionServerStoppedException) {
242             // Throw a DNRE so that we break out of cycle of the retries and instead go and
243             // open scanner against new location.
244             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
245           } else {
246             // The outer layers will retry
247             throw ioe;
248           }
249         }
250         return rrs;
251       }
252     }
253     return null;
254   }
255 
256   private void incRPCcallsMetrics() {
257     if (this.scanMetrics == null) {
258       return;
259     }
260     this.scanMetrics.countOfRPCcalls.incrementAndGet();
261     if (isRegionServerRemote) {
262       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
263     }
264   }
265 
266   private void updateResultsMetrics(Result[] rrs) {
267     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
268       return;
269     }
270     long resultSize = 0;
271     for (Result rr : rrs) {
272       for (Cell kv : rr.rawCells()) {
273         // TODO add getLength to Cell/use CellUtil#estimatedSizeOf
274         resultSize += KeyValueUtil.ensureKeyValue(kv).getLength();
275       }
276     }
277     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
278     if (isRegionServerRemote) {
279       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
280     }
281   }
282 
283   private void close() {
284     if (this.scannerId == -1L) {
285       return;
286     }
287     try {
288       incRPCcallsMetrics();
289       ScanRequest request =
290         RequestConverter.buildScanRequest(this.scannerId, 0, true);
291       try {
292         getStub().scan(null, request);
293       } catch (ServiceException se) {
294         throw ProtobufUtil.getRemoteException(se);
295       }
296     } catch (IOException e) {
297       LOG.warn("Ignore, probably already closed", e);
298     }
299     this.scannerId = -1L;
300   }
301 
302   protected long openScanner() throws IOException {
303     incRPCcallsMetrics();
304     ScanRequest request =
305       RequestConverter.buildScanRequest(
306         getLocation().getRegionInfo().getRegionName(),
307         this.scan, 0, false);
308     try {
309       ScanResponse response = getStub().scan(null, request);
310       long id = response.getScannerId();
311       if (logScannerActivity) {
312         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
313           + " on region " + getLocation().toString());
314       }
315       return id;
316     } catch (ServiceException se) {
317       throw ProtobufUtil.getRemoteException(se);
318     }
319   }
320 
321   protected Scan getScan() {
322     return scan;
323   }
324 
325   /**
326    * Call this when the next invocation of call should close the scanner
327    */
328   public void setClose() {
329     this.closed = true;
330   }
331 
332   /**
333    * @return the HRegionInfo for the current region
334    */
335   public HRegionInfo getHRegionInfo() {
336     if (!instantiated) {
337       return null;
338     }
339     return getLocation().getRegionInfo();
340   }
341 
342   /**
343    * Get the number of rows that will be fetched on next
344    * @return the number of rows for caching
345    */
346   public int getCaching() {
347     return caching;
348   }
349 
350   /**
351    * Set the number of rows that will be fetched on next
352    * @param caching the number of rows for caching
353    */
354   public void setCaching(int caching) {
355     this.caching = caching;
356   }
357 }