View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.UnknownHostException;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellScanner;
31  import org.apache.hadoop.hbase.DoNotRetryIOException;
32  import org.apache.hadoop.hbase.HBaseIOException;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.HRegionLocation;
35  import org.apache.hadoop.hbase.KeyValueUtil;
36  import org.apache.hadoop.hbase.NotServingRegionException;
37  import org.apache.hadoop.hbase.RegionLocations;
38  import org.apache.hadoop.hbase.RemoteExceptionHandler;
39  import org.apache.hadoop.hbase.ServerName;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.UnknownScannerException;
42  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
43  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
44  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
45  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
46  import org.apache.hadoop.hbase.protobuf.RequestConverter;
47  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
48  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
49  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
50  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
51  import org.apache.hadoop.ipc.RemoteException;
52  import org.apache.hadoop.net.DNS;
53  
54  import com.google.protobuf.RpcController;
55  import com.google.protobuf.ServiceException;
56  import com.google.protobuf.TextFormat;
57  
58  /**
59   * Scanner operations such as create, next, etc.
60   * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
61   * {@link RpcRetryingCaller} so fails are retried.
62   */
63  @InterfaceAudience.Private
64  public class ScannerCallable extends RegionServerCallable<Result[]> {
65    public static final String LOG_SCANNER_LATENCY_CUTOFF
66      = "hbase.client.log.scanner.latency.cutoff";
67    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
68  
69    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
70    protected long scannerId = -1L;
71    protected boolean instantiated = false;
72    protected boolean closed = false;
73    private Scan scan;
74    private int caching = 1;
75    protected final ClusterConnection cConnection;
76    protected ScanMetrics scanMetrics;
77    private boolean logScannerActivity = false;
78    private int logCutOffLatency = 1000;
79    private static String myAddress;
80    protected final int id;
81    static {
82      try {
83        myAddress = DNS.getDefaultHost("default", "default");
84      } catch (UnknownHostException uhe) {
85        LOG.error("cannot determine my address", uhe);
86      }
87    }
88  
89    // indicate if it is a remote server call
90    protected boolean isRegionServerRemote = true;
91    private long nextCallSeq = 0;
92    protected RpcControllerFactory controllerFactory;
93  
94    /**
95     * @param connection which connection
96     * @param tableName table callable is on
97     * @param scan the scan to execute
98     * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
99     *          metrics
100    * @param rpcControllerFactory factory to use when creating {@link RpcController}
101    */
102   public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
103       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
104     this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
105   }
106   /**
107    *
108    * @param connection
109    * @param tableName
110    * @param scan
111    * @param scanMetrics
112    * @param id the replicaId
113    */
114   public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
115       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
116     super(connection, tableName, scan.getStartRow());
117     this.id = id;
118     this.cConnection = connection;
119     this.scan = scan;
120     this.scanMetrics = scanMetrics;
121     Configuration conf = connection.getConfiguration();
122     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
123     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
124     this.controllerFactory = rpcControllerFactory;
125   }
126 
127   /**
128    * @param reload force reload of server location
129    * @throws IOException
130    */
131   @Override
132   public void prepare(boolean reload) throws IOException {
133     if (Thread.interrupted()) {
134       throw new InterruptedIOException();
135     }
136     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
137         id, getConnection(), getTableName(), getRow());
138     location = id < rl.size() ? rl.getRegionLocation(id) : null;
139     if (location == null || location.getServerName() == null) {
140       // With this exception, there will be a retry. The location can be null for a replica
141       //  when the table is created or after a split.
142       throw new HBaseIOException("There is no location for replica id #" + id);
143     }
144     ServerName dest = location.getServerName();
145     setStub(super.getConnection().getClient(dest));
146     if (!instantiated || reload) {
147       checkIfRegionServerIsRemote();
148       instantiated = true;
149     }
150 
151     // check how often we retry.
152     // HConnectionManager will call instantiateServer with reload==true
153     // if and only if for retries.
154     if (reload && this.scanMetrics != null) {
155       this.scanMetrics.countOfRPCRetries.incrementAndGet();
156       if (isRegionServerRemote) {
157         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
158       }
159     }
160   }
161 
162   /**
163    * compare the local machine hostname with region server's hostname
164    * to decide if hbase client connects to a remote region server
165    */
166   protected void checkIfRegionServerIsRemote() {
167     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
168       isRegionServerRemote = false;
169     } else {
170       isRegionServerRemote = true;
171     }
172   }
173 
174 
175   @Override
176   @SuppressWarnings("deprecation")
177   public Result [] call(int callTimeout) throws IOException {
178     if (Thread.interrupted()) {
179       throw new InterruptedIOException();
180     }
181     if (closed) {
182       if (scannerId != -1) {
183         close();
184       }
185     } else {
186       if (scannerId == -1L) {
187         this.scannerId = openScanner();
188       } else {
189         Result [] rrs = null;
190         ScanRequest request = null;
191         try {
192           incRPCcallsMetrics();
193           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
194           ScanResponse response = null;
195           PayloadCarryingRpcController controller = controllerFactory.newController();
196           controller.setPriority(getTableName());
197           controller.setCallTimeout(callTimeout);
198           try {
199             response = getStub().scan(controller, request);
200             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
201             // from client to server will increment this number in both sides. Client passes this
202             // number along with the request and at RS side both the incoming nextCallSeq and its
203             // nextCallSeq will be matched. In case of a timeout this increment at the client side
204             // should not happen. If at the server side fetching of next batch of data was over,
205             // there will be mismatch in the nextCallSeq number. Server will throw
206             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
207             // as the last successfully retrieved row.
208             // See HBASE-5974
209             nextCallSeq++;
210             long timestamp = System.currentTimeMillis();
211             // Results are returned via controller
212             CellScanner cellScanner = controller.cellScanner();
213             rrs = ResponseConverter.getResults(cellScanner, response);
214             if (logScannerActivity) {
215               long now = System.currentTimeMillis();
216               if (now - timestamp > logCutOffLatency) {
217                 int rows = rrs == null ? 0 : rrs.length;
218                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
219                   + rows + " rows from scanner=" + scannerId);
220               }
221             }
222             if (response.hasMoreResults()
223                 && !response.getMoreResults()) {
224               scannerId = -1L;
225               closed = true;
226               return null;
227             }
228           } catch (ServiceException se) {
229             throw ProtobufUtil.getRemoteException(se);
230           }
231           updateResultsMetrics(rrs);
232         } catch (IOException e) {
233           if (logScannerActivity) {
234             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
235               + " to " + getLocation(), e);
236           }
237           IOException ioe = e;
238           if (e instanceof RemoteException) {
239             ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
240           }
241           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
242             try {
243               HRegionLocation location =
244                 getConnection().relocateRegion(getTableName(), scan.getStartRow());
245               LOG.info("Scanner=" + scannerId
246                 + " expired, current region location is " + location.toString());
247             } catch (Throwable t) {
248               LOG.info("Failed to relocate region", t);
249             }
250           }
251           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
252           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
253           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
254           // a scan when doing a next in particular, we want to break out and get the scanner to
255           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
256           // yeah and hard to follow and in need of a refactor).
257           if (ioe instanceof NotServingRegionException) {
258             // Throw a DNRE so that we break out of cycle of calling NSRE
259             // when what we need is to open scanner against new location.
260             // Attach NSRE to signal client that it needs to re-setup scanner.
261             if (this.scanMetrics != null) {
262               this.scanMetrics.countOfNSRE.incrementAndGet();
263             }
264             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
265           } else if (ioe instanceof RegionServerStoppedException) {
266             // Throw a DNRE so that we break out of cycle of the retries and instead go and
267             // open scanner against new location.
268             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
269           } else {
270             // The outer layers will retry
271             throw ioe;
272           }
273         }
274         return rrs;
275       }
276     }
277     return null;
278   }
279 
280   private void incRPCcallsMetrics() {
281     if (this.scanMetrics == null) {
282       return;
283     }
284     this.scanMetrics.countOfRPCcalls.incrementAndGet();
285     if (isRegionServerRemote) {
286       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
287     }
288   }
289 
290   private void updateResultsMetrics(Result[] rrs) {
291     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
292       return;
293     }
294     long resultSize = 0;
295     for (Result rr : rrs) {
296       for (Cell kv : rr.rawCells()) {
297         // TODO add getLength to Cell/use CellUtil#estimatedSizeOf
298         resultSize += KeyValueUtil.ensureKeyValue(kv).getLength();
299       }
300     }
301     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
302     if (isRegionServerRemote) {
303       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
304     }
305   }
306 
307   private void close() {
308     if (this.scannerId == -1L) {
309       return;
310     }
311     try {
312       incRPCcallsMetrics();
313       ScanRequest request =
314         RequestConverter.buildScanRequest(this.scannerId, 0, true);
315       try {
316         getStub().scan(null, request);
317       } catch (ServiceException se) {
318         throw ProtobufUtil.getRemoteException(se);
319       }
320     } catch (IOException e) {
321       LOG.warn("Ignore, probably already closed", e);
322     }
323     this.scannerId = -1L;
324   }
325 
326   protected long openScanner() throws IOException {
327     incRPCcallsMetrics();
328     ScanRequest request =
329       RequestConverter.buildScanRequest(
330         getLocation().getRegionInfo().getRegionName(),
331         this.scan, 0, false);
332     try {
333       ScanResponse response = getStub().scan(null, request);
334       long id = response.getScannerId();
335       if (logScannerActivity) {
336         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
337           + " on region " + getLocation().toString());
338       }
339       return id;
340     } catch (ServiceException se) {
341       throw ProtobufUtil.getRemoteException(se);
342     }
343   }
344 
345   protected Scan getScan() {
346     return scan;
347   }
348 
349   /**
350    * Call this when the next invocation of call should close the scanner
351    */
352   public void setClose() {
353     this.closed = true;
354   }
355 
356   /**
357    * @return the HRegionInfo for the current region
358    */
359   @Override
360   public HRegionInfo getHRegionInfo() {
361     if (!instantiated) {
362       return null;
363     }
364     return getLocation().getRegionInfo();
365   }
366 
367   /**
368    * Get the number of rows that will be fetched on next
369    * @return the number of rows for caching
370    */
371   public int getCaching() {
372     return caching;
373   }
374 
375   @Override
376   public ClusterConnection getConnection() {
377     return cConnection;
378   }
379 
380   /**
381    * Set the number of rows that will be fetched on next
382    * @param caching the number of rows for caching
383    */
384   public void setCaching(int caching) {
385     this.caching = caching;
386   }
387 
388   public ScannerCallable getScannerCallableForReplica(int id) {
389     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
390         this.getScan(), this.scanMetrics, controllerFactory, id);
391     s.setCaching(this.caching);
392     return s;
393   }
394 }