View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.UnknownHostException;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.DoNotRetryIOException;
32  import org.apache.hadoop.hbase.HBaseIOException;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.HRegionLocation;
35  import org.apache.hadoop.hbase.NotServingRegionException;
36  import org.apache.hadoop.hbase.RegionLocations;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.UnknownScannerException;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
42  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
43  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
44  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
45  import org.apache.hadoop.hbase.protobuf.RequestConverter;
46  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
47  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
48  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
49  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
50  import org.apache.hadoop.ipc.RemoteException;
51  import org.apache.hadoop.net.DNS;
52  
53  import com.google.protobuf.RpcController;
54  import com.google.protobuf.ServiceException;
55  import com.google.protobuf.TextFormat;
56  
57  /**
58   * Scanner operations such as create, next, etc.
59   * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
60   * {@link RpcRetryingCaller} so fails are retried.
61   */
62  @InterfaceAudience.Private
63  public class ScannerCallable extends RegionServerCallable<Result[]> {
64    public static final String LOG_SCANNER_LATENCY_CUTOFF
65      = "hbase.client.log.scanner.latency.cutoff";
66    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
67  
68    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
69    protected long scannerId = -1L;
70    protected boolean instantiated = false;
71    protected boolean closed = false;
72    private Scan scan;
73    private int caching = 1;
74    protected final ClusterConnection cConnection;
75    protected ScanMetrics scanMetrics;
76    private boolean logScannerActivity = false;
77    private int logCutOffLatency = 1000;
78    private static String myAddress;
79    protected final int id;
80    static {
81      try {
82        myAddress = DNS.getDefaultHost("default", "default");
83      } catch (UnknownHostException uhe) {
84        LOG.error("cannot determine my address", uhe);
85      }
86    }
87  
88    // indicate if it is a remote server call
89    protected boolean isRegionServerRemote = true;
90    private long nextCallSeq = 0;
91    protected RpcControllerFactory controllerFactory;
92  
93    /**
94     * @param connection which connection
95     * @param tableName table callable is on
96     * @param scan the scan to execute
97     * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
98     *          metrics
99     * @param rpcControllerFactory factory to use when creating {@link RpcController}
100    */
101   public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
102       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
103     this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
104   }
105   /**
106    *
107    * @param connection
108    * @param tableName
109    * @param scan
110    * @param scanMetrics
111    * @param id the replicaId
112    */
113   public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
114       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
115     super(connection, tableName, scan.getStartRow());
116     this.id = id;
117     this.cConnection = connection;
118     this.scan = scan;
119     this.scanMetrics = scanMetrics;
120     Configuration conf = connection.getConfiguration();
121     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
122     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
123     this.controllerFactory = rpcControllerFactory;
124   }
125 
126   /**
127    * @param reload force reload of server location
128    * @throws IOException
129    */
130   @Override
131   public void prepare(boolean reload) throws IOException {
132     if (Thread.interrupted()) {
133       throw new InterruptedIOException();
134     }
135     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
136         id, getConnection(), getTableName(), getRow());
137     location = id < rl.size() ? rl.getRegionLocation(id) : null;
138     if (location == null || location.getServerName() == null) {
139       // With this exception, there will be a retry. The location can be null for a replica
140       //  when the table is created or after a split.
141       throw new HBaseIOException("There is no location for replica id #" + id);
142     }
143     ServerName dest = location.getServerName();
144     setStub(super.getConnection().getClient(dest));
145     if (!instantiated || reload) {
146       checkIfRegionServerIsRemote();
147       instantiated = true;
148     }
149 
150     // check how often we retry.
151     // HConnectionManager will call instantiateServer with reload==true
152     // if and only if for retries.
153     if (reload && this.scanMetrics != null) {
154       this.scanMetrics.countOfRPCRetries.incrementAndGet();
155       if (isRegionServerRemote) {
156         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
157       }
158     }
159   }
160 
161   /**
162    * compare the local machine hostname with region server's hostname
163    * to decide if hbase client connects to a remote region server
164    */
165   protected void checkIfRegionServerIsRemote() {
166     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
167       isRegionServerRemote = false;
168     } else {
169       isRegionServerRemote = true;
170     }
171   }
172 
173 
174   @Override
175   @SuppressWarnings("deprecation")
176   public Result [] call(int callTimeout) throws IOException {
177     if (Thread.interrupted()) {
178       throw new InterruptedIOException();
179     }
180     if (closed) {
181       if (scannerId != -1) {
182         close();
183       }
184     } else {
185       if (scannerId == -1L) {
186         this.scannerId = openScanner();
187       } else {
188         Result [] rrs = null;
189         ScanRequest request = null;
190         try {
191           incRPCcallsMetrics();
192           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
193           ScanResponse response = null;
194           PayloadCarryingRpcController controller = controllerFactory.newController();
195           controller.setPriority(getTableName());
196           controller.setCallTimeout(callTimeout);
197           try {
198             response = getStub().scan(controller, request);
199             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
200             // from client to server will increment this number in both sides. Client passes this
201             // number along with the request and at RS side both the incoming nextCallSeq and its
202             // nextCallSeq will be matched. In case of a timeout this increment at the client side
203             // should not happen. If at the server side fetching of next batch of data was over,
204             // there will be mismatch in the nextCallSeq number. Server will throw
205             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
206             // as the last successfully retrieved row.
207             // See HBASE-5974
208             nextCallSeq++;
209             long timestamp = System.currentTimeMillis();
210             // Results are returned via controller
211             CellScanner cellScanner = controller.cellScanner();
212             rrs = ResponseConverter.getResults(cellScanner, response);
213             if (logScannerActivity) {
214               long now = System.currentTimeMillis();
215               if (now - timestamp > logCutOffLatency) {
216                 int rows = rrs == null ? 0 : rrs.length;
217                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
218                   + rows + " rows from scanner=" + scannerId);
219               }
220             }
221             if (response.hasMoreResults()
222                 && !response.getMoreResults()) {
223               scannerId = -1L;
224               closed = true;
225               return null;
226             }
227           } catch (ServiceException se) {
228             throw ProtobufUtil.getRemoteException(se);
229           }
230           updateResultsMetrics(rrs);
231         } catch (IOException e) {
232           if (logScannerActivity) {
233             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
234               + " to " + getLocation(), e);
235           }
236           IOException ioe = e;
237           if (e instanceof RemoteException) {
238             ioe = ((RemoteException) e).unwrapRemoteException();
239           }
240           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
241             try {
242               HRegionLocation location =
243                 getConnection().relocateRegion(getTableName(), scan.getStartRow());
244               LOG.info("Scanner=" + scannerId
245                 + " expired, current region location is " + location.toString());
246             } catch (Throwable t) {
247               LOG.info("Failed to relocate region", t);
248             }
249           }
250           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
251           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
252           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
253           // a scan when doing a next in particular, we want to break out and get the scanner to
254           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
255           // yeah and hard to follow and in need of a refactor).
256           if (ioe instanceof NotServingRegionException) {
257             // Throw a DNRE so that we break out of cycle of calling NSRE
258             // when what we need is to open scanner against new location.
259             // Attach NSRE to signal client that it needs to re-setup scanner.
260             if (this.scanMetrics != null) {
261               this.scanMetrics.countOfNSRE.incrementAndGet();
262             }
263             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
264           } else if (ioe instanceof RegionServerStoppedException) {
265             // Throw a DNRE so that we break out of cycle of the retries and instead go and
266             // open scanner against new location.
267             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
268           } else {
269             // The outer layers will retry
270             throw ioe;
271           }
272         }
273         return rrs;
274       }
275     }
276     return null;
277   }
278 
279   private void incRPCcallsMetrics() {
280     if (this.scanMetrics == null) {
281       return;
282     }
283     this.scanMetrics.countOfRPCcalls.incrementAndGet();
284     if (isRegionServerRemote) {
285       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
286     }
287   }
288 
289   private void updateResultsMetrics(Result[] rrs) {
290     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
291       return;
292     }
293     long resultSize = 0;
294     for (Result rr : rrs) {
295       for (Cell cell : rr.rawCells()) {
296         resultSize += CellUtil.estimatedSerializedSizeOf(cell);
297       }
298     }
299     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
300     if (isRegionServerRemote) {
301       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
302     }
303   }
304 
305   private void close() {
306     if (this.scannerId == -1L) {
307       return;
308     }
309     try {
310       incRPCcallsMetrics();
311       ScanRequest request =
312         RequestConverter.buildScanRequest(this.scannerId, 0, true);
313       try {
314         getStub().scan(null, request);
315       } catch (ServiceException se) {
316         throw ProtobufUtil.getRemoteException(se);
317       }
318     } catch (IOException e) {
319       LOG.warn("Ignore, probably already closed", e);
320     }
321     this.scannerId = -1L;
322   }
323 
324   protected long openScanner() throws IOException {
325     incRPCcallsMetrics();
326     ScanRequest request =
327       RequestConverter.buildScanRequest(
328         getLocation().getRegionInfo().getRegionName(),
329         this.scan, 0, false);
330     try {
331       ScanResponse response = getStub().scan(null, request);
332       long id = response.getScannerId();
333       if (logScannerActivity) {
334         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
335           + " on region " + getLocation().toString());
336       }
337       return id;
338     } catch (ServiceException se) {
339       throw ProtobufUtil.getRemoteException(se);
340     }
341   }
342 
343   protected Scan getScan() {
344     return scan;
345   }
346 
347   /**
348    * Call this when the next invocation of call should close the scanner
349    */
350   public void setClose() {
351     this.closed = true;
352   }
353 
354   /**
355    * @return the HRegionInfo for the current region
356    */
357   @Override
358   public HRegionInfo getHRegionInfo() {
359     if (!instantiated) {
360       return null;
361     }
362     return getLocation().getRegionInfo();
363   }
364 
365   /**
366    * Get the number of rows that will be fetched on next
367    * @return the number of rows for caching
368    */
369   public int getCaching() {
370     return caching;
371   }
372 
373   @Override
374   public ClusterConnection getConnection() {
375     return cConnection;
376   }
377 
378   /**
379    * Set the number of rows that will be fetched on next
380    * @param caching the number of rows for caching
381    */
382   public void setCaching(int caching) {
383     this.caching = caching;
384   }
385 
386   public ScannerCallable getScannerCallableForReplica(int id) {
387     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
388         this.getScan(), this.scanMetrics, controllerFactory, id);
389     s.setCaching(this.caching);
390     return s;
391   }
392 }