View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.UnknownHostException;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.DoNotRetryIOException;
32  import org.apache.hadoop.hbase.HBaseIOException;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.HRegionLocation;
35  import org.apache.hadoop.hbase.NotServingRegionException;
36  import org.apache.hadoop.hbase.RegionLocations;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.UnknownScannerException;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
42  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
43  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
44  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
45  import org.apache.hadoop.hbase.protobuf.RequestConverter;
46  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
47  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
48  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
49  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
50  import org.apache.hadoop.ipc.RemoteException;
51  import org.apache.hadoop.net.DNS;
52  
53  import com.google.protobuf.ServiceException;
54  import com.google.protobuf.TextFormat;
55  
56  /**
57   * Scanner operations such as create, next, etc.
58   * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
59   * {@link RpcRetryingCaller} so fails are retried.
60   */
61  @InterfaceAudience.Private
62  public class ScannerCallable extends RegionServerCallable<Result[]> {
63    public static final String LOG_SCANNER_LATENCY_CUTOFF
64      = "hbase.client.log.scanner.latency.cutoff";
65    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
66  
67    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
68    protected long scannerId = -1L;
69    protected boolean instantiated = false;
70    protected boolean closed = false;
71    private Scan scan;
72    private int caching = 1;
73    protected final ClusterConnection cConnection;
74    protected ScanMetrics scanMetrics;
75    private boolean logScannerActivity = false;
76    private int logCutOffLatency = 1000;
77    private static String myAddress;
78    protected final int id;
79    static {
80      try {
81        myAddress = DNS.getDefaultHost("default", "default");
82      } catch (UnknownHostException uhe) {
83        LOG.error("cannot determine my address", uhe);
84      }
85    }
86  
87    // indicate if it is a remote server call
88    protected boolean isRegionServerRemote = true;
89    private long nextCallSeq = 0;
90    protected RpcControllerFactory controllerFactory;
91    protected PayloadCarryingRpcController controller;
92  
93    /**
94     * @param connection which connection
95     * @param tableName table callable is on
96     * @param scan the scan to execute
97     * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
98     *          metrics
99     * @param rpcControllerFactory factory to use when creating 
100    *        {@link com.google.protobuf.RpcController}
101    */
102   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
103       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
104     this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
105   }
106   /**
107    *
108    * @param connection
109    * @param tableName
110    * @param scan
111    * @param scanMetrics
112    * @param id the replicaId
113    */
114   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
115       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
116     super(connection, tableName, scan.getStartRow());
117     this.id = id;
118     this.cConnection = connection;
119     this.scan = scan;
120     this.scanMetrics = scanMetrics;
121     Configuration conf = connection.getConfiguration();
122     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
123     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
124     this.controllerFactory = rpcControllerFactory;
125   }
126 
127   PayloadCarryingRpcController getController() {
128     return controller;
129   }
130 
131   /**
132    * @param reload force reload of server location
133    * @throws IOException
134    */
135   @Override
136   public void prepare(boolean reload) throws IOException {
137     if (Thread.interrupted()) {
138       throw new InterruptedIOException();
139     }
140     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
141         id, getConnection(), getTableName(), getRow());
142     location = id < rl.size() ? rl.getRegionLocation(id) : null;
143     if (location == null || location.getServerName() == null) {
144       // With this exception, there will be a retry. The location can be null for a replica
145       //  when the table is created or after a split.
146       throw new HBaseIOException("There is no location for replica id #" + id);
147     }
148     ServerName dest = location.getServerName();
149     setStub(super.getConnection().getClient(dest));
150     if (!instantiated || reload) {
151       checkIfRegionServerIsRemote();
152       instantiated = true;
153     }
154 
155     // check how often we retry.
156     // HConnectionManager will call instantiateServer with reload==true
157     // if and only if for retries.
158     if (reload && this.scanMetrics != null) {
159       this.scanMetrics.countOfRPCRetries.incrementAndGet();
160       if (isRegionServerRemote) {
161         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
162       }
163     }
164   }
165 
166   /**
167    * compare the local machine hostname with region server's hostname
168    * to decide if hbase client connects to a remote region server
169    */
170   protected void checkIfRegionServerIsRemote() {
171     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
172       isRegionServerRemote = false;
173     } else {
174       isRegionServerRemote = true;
175     }
176   }
177 
178 
179   @Override
180   @SuppressWarnings("deprecation")
181   public Result [] call(int callTimeout) throws IOException {
182     if (Thread.interrupted()) {
183       throw new InterruptedIOException();
184     }
185     if (closed) {
186       if (scannerId != -1) {
187         close();
188       }
189     } else {
190       if (scannerId == -1L) {
191         this.scannerId = openScanner();
192       } else {
193         Result [] rrs = null;
194         ScanRequest request = null;
195         try {
196           incRPCcallsMetrics();
197           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
198           ScanResponse response = null;
199           controller = controllerFactory.newController();
200           controller.setPriority(getTableName());
201           controller.setCallTimeout(callTimeout);
202           try {
203             response = getStub().scan(controller, request);
204             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
205             // from client to server will increment this number in both sides. Client passes this
206             // number along with the request and at RS side both the incoming nextCallSeq and its
207             // nextCallSeq will be matched. In case of a timeout this increment at the client side
208             // should not happen. If at the server side fetching of next batch of data was over,
209             // there will be mismatch in the nextCallSeq number. Server will throw
210             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
211             // as the last successfully retrieved row.
212             // See HBASE-5974
213             nextCallSeq++;
214             long timestamp = System.currentTimeMillis();
215             // Results are returned via controller
216             CellScanner cellScanner = controller.cellScanner();
217             rrs = ResponseConverter.getResults(cellScanner, response);
218             if (logScannerActivity) {
219               long now = System.currentTimeMillis();
220               if (now - timestamp > logCutOffLatency) {
221                 int rows = rrs == null ? 0 : rrs.length;
222                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
223                   + rows + " rows from scanner=" + scannerId);
224               }
225             }
226             if (response.hasMoreResults()
227                 && !response.getMoreResults()) {
228               scannerId = -1L;
229               closed = true;
230               return null;
231             }
232           } catch (ServiceException se) {
233             throw ProtobufUtil.getRemoteException(se);
234           }
235           updateResultsMetrics(rrs);
236         } catch (IOException e) {
237           if (logScannerActivity) {
238             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
239               + " to " + getLocation(), e);
240           }
241           IOException ioe = e;
242           if (e instanceof RemoteException) {
243             ioe = ((RemoteException) e).unwrapRemoteException();
244           }
245           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
246             try {
247               HRegionLocation location =
248                 getConnection().relocateRegion(getTableName(), scan.getStartRow());
249               LOG.info("Scanner=" + scannerId
250                 + " expired, current region location is " + location.toString());
251             } catch (Throwable t) {
252               LOG.info("Failed to relocate region", t);
253             }
254           }
255           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
256           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
257           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
258           // a scan when doing a next in particular, we want to break out and get the scanner to
259           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
260           // yeah and hard to follow and in need of a refactor).
261           if (ioe instanceof NotServingRegionException) {
262             // Throw a DNRE so that we break out of cycle of calling NSRE
263             // when what we need is to open scanner against new location.
264             // Attach NSRE to signal client that it needs to re-setup scanner.
265             if (this.scanMetrics != null) {
266               this.scanMetrics.countOfNSRE.incrementAndGet();
267             }
268             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
269           } else if (ioe instanceof RegionServerStoppedException) {
270             // Throw a DNRE so that we break out of cycle of the retries and instead go and
271             // open scanner against new location.
272             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
273           } else {
274             // The outer layers will retry
275             throw ioe;
276           }
277         }
278         return rrs;
279       }
280     }
281     return null;
282   }
283 
284   private void incRPCcallsMetrics() {
285     if (this.scanMetrics == null) {
286       return;
287     }
288     this.scanMetrics.countOfRPCcalls.incrementAndGet();
289     if (isRegionServerRemote) {
290       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
291     }
292   }
293 
294   private void updateResultsMetrics(Result[] rrs) {
295     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
296       return;
297     }
298     long resultSize = 0;
299     for (Result rr : rrs) {
300       for (Cell cell : rr.rawCells()) {
301         resultSize += CellUtil.estimatedSerializedSizeOf(cell);
302       }
303     }
304     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
305     if (isRegionServerRemote) {
306       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
307     }
308   }
309 
310   private void close() {
311     if (this.scannerId == -1L) {
312       return;
313     }
314     try {
315       incRPCcallsMetrics();
316       ScanRequest request =
317         RequestConverter.buildScanRequest(this.scannerId, 0, true);
318       try {
319         getStub().scan(null, request);
320       } catch (ServiceException se) {
321         throw ProtobufUtil.getRemoteException(se);
322       }
323     } catch (IOException e) {
324       LOG.warn("Ignore, probably already closed", e);
325     }
326     this.scannerId = -1L;
327   }
328 
329   protected long openScanner() throws IOException {
330     incRPCcallsMetrics();
331     ScanRequest request =
332       RequestConverter.buildScanRequest(
333         getLocation().getRegionInfo().getRegionName(),
334         this.scan, 0, false);
335     try {
336       ScanResponse response = getStub().scan(null, request);
337       long id = response.getScannerId();
338       if (logScannerActivity) {
339         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
340           + " on region " + getLocation().toString());
341       }
342       return id;
343     } catch (ServiceException se) {
344       throw ProtobufUtil.getRemoteException(se);
345     }
346   }
347 
348   protected Scan getScan() {
349     return scan;
350   }
351 
352   /**
353    * Call this when the next invocation of call should close the scanner
354    */
355   public void setClose() {
356     this.closed = true;
357   }
358 
359   /**
360    * @return the HRegionInfo for the current region
361    */
362   @Override
363   public HRegionInfo getHRegionInfo() {
364     if (!instantiated) {
365       return null;
366     }
367     return getLocation().getRegionInfo();
368   }
369 
370   /**
371    * Get the number of rows that will be fetched on next
372    * @return the number of rows for caching
373    */
374   public int getCaching() {
375     return caching;
376   }
377 
378   @Override
379   public ClusterConnection getConnection() {
380     return cConnection;
381   }
382 
383   /**
384    * Set the number of rows that will be fetched on next
385    * @param caching the number of rows for caching
386    */
387   public void setCaching(int caching) {
388     this.caching = caching;
389   }
390 
391   public ScannerCallable getScannerCallableForReplica(int id) {
392     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
393         this.getScan(), this.scanMetrics, controllerFactory, id);
394     s.setCaching(this.caching);
395     return s;
396   }
397 }