1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import com.google.protobuf.ServiceException;
22  import com.google.protobuf.TextFormat;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.classification.InterfaceStability;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.HRegionLocation;
31  import org.apache.hadoop.hbase.RemoteExceptionHandler;
32  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
33  import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
34  import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
35  import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
36  import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.RequestConverter;
39  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
40  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
41  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
42  import org.apache.hadoop.ipc.RemoteException;
43  import org.apache.hadoop.net.DNS;
44  
45  import java.io.IOException;
46  import java.net.UnknownHostException;
47  
48  /**
49   * Retries scanner operations such as create, next, etc.
50   * Used by {@link ResultScanner}s made by {@link HTable}.
51   */
52  @InterfaceAudience.Public
53  @InterfaceStability.Stable
54  public class ScannerCallable extends ServerCallable<Result[]> {
55    public static final String LOG_SCANNER_LATENCY_CUTOFF
56      = "hbase.client.log.scanner.latency.cutoff";
57    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
58  
59    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
60    private long scannerId = -1L;
61    private boolean instantiated = false;
62    private boolean closed = false;
63    private Scan scan;
64    private int caching = 1;
65    private ScanMetrics scanMetrics;
66    private boolean logScannerActivity = false;
67    private int logCutOffLatency = 1000;
68  
69    // indicate if it is a remote server call
70    private boolean isRegionServerRemote = true;
71    private long nextCallSeq = 0;
72    
73    /**
74     * @param connection which connection
75     * @param tableName table callable is on
76     * @param scan the scan to execute
77     * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable
78     * won't collect metrics
79     */
80    public ScannerCallable (HConnection connection, byte [] tableName, Scan scan,
81      ScanMetrics scanMetrics) {
82      super(connection, tableName, scan.getStartRow());
83      this.scan = scan;
84      this.scanMetrics = scanMetrics;
85      Configuration conf = connection.getConfiguration();
86      logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
87      logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
88    }
89  
90    /**
91     * @param reload force reload of server location
92     * @throws IOException
93     */
94    @Override
95    public void prepare(boolean reload) throws IOException {
96      if (!instantiated || reload) {
97        super.prepare(reload);
98        checkIfRegionServerIsRemote();
99        instantiated = true;
100     }
101 
102     // check how often we retry.
103     // HConnectionManager will call instantiateServer with reload==true
104     // if and only if for retries.
105     if (reload && this.scanMetrics != null) {
106       this.scanMetrics.countOfRPCRetries.incrementAndGet();
107       if (isRegionServerRemote) {
108         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
109       }
110     }
111   }
112 
113   /**
114    * compare the local machine hostname with region server's hostname
115    * to decide if hbase client connects to a remote region server
116    * @throws UnknownHostException.
117    */
118   private void checkIfRegionServerIsRemote() throws UnknownHostException {
119     String myAddress = DNS.getDefaultHost("default", "default");
120     if (this.location.getHostname().equalsIgnoreCase(myAddress)) {
121       isRegionServerRemote = false;
122     } else {
123       isRegionServerRemote = true;
124     }
125   }
126 
127   /**
128    * @see java.util.concurrent.Callable#call()
129    */
130   public Result [] call() throws IOException {
131     if (closed) {
132       if (scannerId != -1) {
133         close();
134       }
135     } else {
136       if (scannerId == -1L) {
137         this.scannerId = openScanner();
138       } else {
139         Result [] rrs = null;
140         ScanRequest request = null;
141         try {
142           incRPCcallsMetrics();
143           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
144           ScanResponse response = null;
145           try {
146             response = stub.scan(null, request);
147             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
148             // from client to server will increment this number in both sides. Client passes this
149             // number along with the request and at RS side both the incoming nextCallSeq and its
150             // nextCallSeq will be matched. In case of a timeout this increment at the client side
151             // should not happen. If at the server side fetching of next batch of data was over,
152             // there will be mismatch in the nextCallSeq number. Server will throw
153             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
154             // as the last successfully retrieved row.
155             // See HBASE-5974
156             nextCallSeq++;
157             long timestamp = System.currentTimeMillis();
158             rrs = ResponseConverter.getResults(response);
159             if (logScannerActivity) {
160               long now = System.currentTimeMillis();
161               if (now - timestamp > logCutOffLatency) {
162                 int rows = rrs == null ? 0 : rrs.length;
163                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
164                   + rows + " rows from scanner=" + scannerId);
165               }
166             }
167             if (response.hasMoreResults()
168                 && !response.getMoreResults()) {
169               scannerId = -1L;
170               closed = true;
171               return null;
172             }
173           } catch (ServiceException se) {
174             throw ProtobufUtil.getRemoteException(se);
175           }
176           updateResultsMetrics(response);
177         } catch (IOException e) {
178           if (logScannerActivity) {
179             LOG.info("Got exception making request " + TextFormat.shortDebugString(request), e);
180           }
181           IOException ioe = e;
182           if (e instanceof RemoteException) {
183             ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
184           }
185           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
186             try {
187               HRegionLocation location =
188                 connection.relocateRegion(tableName, scan.getStartRow());
189               LOG.info("Scanner=" + scannerId
190                 + " expired, current region location is " + location.toString()
191                 + " ip:" + location.getHostnamePort());
192             } catch (Throwable t) {
193               LOG.info("Failed to relocate region", t);
194             }
195           }
196           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
197           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
198           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
199           // a scan when doing a next in particular, we want to break out and get the scanner to
200           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
201           // yeah and hard to follow and in need of a refactor).
202           if (ioe instanceof NotServingRegionException) {
203             // Throw a DNRE so that we break out of cycle of calling NSRE
204             // when what we need is to open scanner against new location.
205             // Attach NSRE to signal client that it needs to re-setup scanner.
206             if (this.scanMetrics != null) {
207               this.scanMetrics.countOfNSRE.incrementAndGet();
208             }
209             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
210           } else if (ioe instanceof RegionServerStoppedException) {
211             // Throw a DNRE so that we break out of cycle of the retries and instead go and
212             // open scanner against new location.
213             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
214           } else {
215             // The outer layers will retry
216             throw ioe;
217           }
218         }
219         return rrs;
220       }
221     }
222     return null;
223   }
224 
225   private void incRPCcallsMetrics() {
226     if (this.scanMetrics == null) {
227       return;
228     }
229     this.scanMetrics.countOfRPCcalls.incrementAndGet();
230     if (isRegionServerRemote) {
231       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
232     }
233   }
234 
235   private void updateResultsMetrics(ScanResponse response) {
236     if (this.scanMetrics == null || !response.hasResultSizeBytes()) {
237       return;
238     }
239     long value = response.getResultSizeBytes();
240     this.scanMetrics.countOfBytesInResults.addAndGet(value);
241     if (isRegionServerRemote) {
242       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(value);
243     }
244   }
245 
246   private void close() {
247     if (this.scannerId == -1L) {
248       return;
249     }
250     try {
251       incRPCcallsMetrics();
252       ScanRequest request =
253         RequestConverter.buildScanRequest(this.scannerId, 0, true);
254       try {
255         stub.scan(null, request);
256       } catch (ServiceException se) {
257         throw ProtobufUtil.getRemoteException(se);
258       }
259     } catch (IOException e) {
260       LOG.warn("Ignore, probably already closed", e);
261     }
262     this.scannerId = -1L;
263   }
264 
265   protected long openScanner() throws IOException {
266     incRPCcallsMetrics();
267     ScanRequest request =
268       RequestConverter.buildScanRequest(
269         this.location.getRegionInfo().getRegionName(),
270         this.scan, 0, false);
271     try {
272       ScanResponse response = stub.scan(null, request);
273       long id = response.getScannerId();
274       if (logScannerActivity) {
275         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
276           + " on region " + this.location.toString() + " ip:"
277           + this.location.getHostnamePort());
278       }
279       return id;
280     } catch (ServiceException se) {
281       throw ProtobufUtil.getRemoteException(se);
282     }
283   }
284 
285   protected Scan getScan() {
286     return scan;
287   }
288 
289   /**
290    * Call this when the next invocation of call should close the scanner
291    */
292   public void setClose() {
293     this.closed = true;
294   }
295 
296   /**
297    * @return the HRegionInfo for the current region
298    */
299   public HRegionInfo getHRegionInfo() {
300     if (!instantiated) {
301       return null;
302     }
303     return location.getRegionInfo();
304   }
305 
306   /**
307    * Get the number of rows that will be fetched on next
308    * @return the number of rows for caching
309    */
310   public int getCaching() {
311     return caching;
312   }
313 
314   /**
315    * Set the number of rows that will be fetched on next
316    * @param caching the number of rows for caching
317    */
318   public void setCaching(int caching) {
319     this.caching = caching;
320   }
321 }