View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.net.UnknownHostException;
23  
24  import org.apache.commons.lang.exception.ExceptionUtils;
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
29  import org.apache.hadoop.hbase.CallSequenceOutOfOrderException;
30  import org.apache.hadoop.hbase.DoNotRetryIOException;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.HRegionLocation;
33  import org.apache.hadoop.hbase.NotServingRegionException;
34  import org.apache.hadoop.hbase.RemoteExceptionHandler;
35  import org.apache.hadoop.hbase.UnknownScannerException;
36  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
37  import org.apache.hadoop.ipc.RemoteException;
38  import org.apache.hadoop.net.DNS;
39  
40  /**
41   * Retries scanner operations such as create, next, etc.
42   * Used by {@link ResultScanner}s made by {@link HTable}.
43   */
44  public class ScannerCallable extends ServerCallable<Result[]> {
45    public static final String LOG_SCANNER_LATENCY_CUTOFF
46      = "hbase.client.log.scanner.latency.cutoff";
47    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
48    private static final Log LOG = LogFactory.getLog(ScannerCallable.class);
49    private long scannerId = -1L;
50    private boolean instantiated = false;
51    private boolean closed = false;
52    private Scan scan;
53    private int caching = 1;
54    private ScanMetrics scanMetrics;
55    private boolean logScannerActivity = false;
56    private int logCutOffLatency = 1000;
57    private static String myAddress;
58    static {
59      try {
60        myAddress = DNS.getDefaultHost("default", "default");
61      } catch (UnknownHostException uhe) {
62        LOG.error("cannot determine my address", uhe);
63      }
64    }
65  
66    // indicate if it is a remote server call
67    private boolean isRegionServerRemote = true;
68    private long callSeq = 0;
69    private boolean useCallSeq = true;
70  
71    /**
72     * @param connection which connection
73     * @param tableName table callable is on
74     * @param scan the scan to execute
75     * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable
76     * won't collect metrics
77     */
78    public ScannerCallable (HConnection connection, byte [] tableName, Scan scan,
79      ScanMetrics scanMetrics) {
80      super(connection, tableName, scan.getStartRow());
81      this.scan = scan;
82      this.scanMetrics = scanMetrics;
83      Configuration conf = connection.getConfiguration();
84      logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
85      logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
86    }
87  
88    /**
89     * @param reload force reload of server location
90     * @throws IOException
91     */
92    @Override
93    public void connect(boolean reload) throws IOException {
94      if (!instantiated || reload) {
95        super.connect(reload);
96        checkIfRegionServerIsRemote();
97        instantiated = true;
98      }
99  
100     // check how often we retry.
101     // HConnectionManager will call instantiateServer with reload==true
102     // if and only if for retries.
103     if (reload && this.scanMetrics != null) {
104       this.scanMetrics.countOfRPCRetries.inc();
105       if (isRegionServerRemote) {
106         this.scanMetrics.countOfRemoteRPCRetries.inc();
107       }
108     }
109   }
110 
111   /**
112    * compare the local machine hostname with region server's hostname
113    * to decide if hbase client connects to a remote region server
114    */
115   private void checkIfRegionServerIsRemote() {
116     if (this.location.getHostname().equalsIgnoreCase(myAddress)) {
117       isRegionServerRemote = false;
118     } else {
119       isRegionServerRemote = true;
120     }
121   }
122 
123   /**
124    * @see java.util.concurrent.Callable#call()
125    */
126   public Result [] call() throws IOException {
127     if (scannerId != -1L && closed) {
128       close();
129     } else if (scannerId == -1L && !closed) {
130       this.scannerId = openScanner();
131     } else {
132       Result [] rrs = null;
133       try {
134         incRPCcallsMetrics();
135         long timestamp = System.currentTimeMillis();
136         if (useCallSeq) {
137           try {
138             rrs = server.next(scannerId, caching, callSeq);
139             // increment the callSeq which will be getting used for the next time next() call to
140             // the RS.In case of a timeout this increment should not happen so that the next
141             // trial also will be done with the same callSeq.
142             callSeq++;
143           } catch (IOException ioe) {
144             // TODO This is an ugly way of checking. Any other ways?
145             if (ioe instanceof RemoteException
146                 && ExceptionUtils.getStackTrace(ioe).contains("java.lang.NoSuchMethodException")) {
147               // This will happen when we use a latest version of the client but still running with
148               // old region server. At server side there is no implementation for the seq number
149               // based scanning. Set the useCallSeq to false.
150               LOG.warn("Seq number based scan API not present at RS side! Trying with API: "
151                   + "next(scannerId, caching). Consider upgrading version at RS "
152                   + location.getHostnamePort());
153               useCallSeq = false;
154               rrs = server.next(scannerId, caching);
155             } else {
156               // Throw it back so that will get handled by the below original catch blocks;
157               throw ioe;
158             }
159           }
160         } else {
161           rrs = server.next(scannerId, caching);
162         }
163         if (logScannerActivity) {
164           long now = System.currentTimeMillis();
165           if (now - timestamp > logCutOffLatency) {
166             int rows = rrs == null ? 0 : rrs.length;
167             LOG.info("Took " + (now-timestamp) + "ms to fetch "
168               + rows + " rows from scanner=" + scannerId);
169           }
170         }
171         updateResultsMetrics(rrs);
172       } catch (IOException e) {
173         if (logScannerActivity) {
174           LOG.info("Got exception in fetching from scanner="
175             + scannerId, e);
176         }
177         IOException ioe = null;
178         if (e instanceof RemoteException) {
179           ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
180         }
181         if (ioe == null) throw new IOException(e);
182         if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
183           try {
184             HRegionLocation location =
185               connection.relocateRegion(tableName, scan.getStartRow());
186             LOG.info("Scanner=" + scannerId
187               + " expired, current region location is " + location.toString()
188               + " ip:" + location.getServerAddress().getBindAddress());
189           } catch (Throwable t) {
190             LOG.info("Failed to relocate region", t);
191           }
192         }
193         if (ioe instanceof NotServingRegionException) {
194           // Throw a DNRE so that we break out of cycle of calling NSRE
195           // when what we need is to open scanner against new location.
196           // Attach NSRE to signal client that it needs to resetup scanner.
197           if (this.scanMetrics != null) {
198             this.scanMetrics.countOfNSRE.inc();
199           }
200           throw new DoNotRetryIOException("Reset scanner", ioe);
201         } else if (ioe instanceof RegionServerStoppedException) {
202           // Throw a DNRE so that we break out of cycle of calling RSSE
203           // when what we need is to open scanner against new location.
204           // Attach RSSE to signal client that it needs to resetup scanner.
205           throw new DoNotRetryIOException("Reset scanner", ioe);
206         } else {
207           // The outer layers will retry
208           throw ioe;
209         }
210       }
211       return rrs;
212     }
213     return null;
214   }
215 
216   private void incRPCcallsMetrics() {
217     if (this.scanMetrics == null) {
218       return;
219     }
220     this.scanMetrics.countOfRPCcalls.inc();
221     if (isRegionServerRemote) {
222       this.scanMetrics.countOfRemoteRPCcalls.inc();
223     }
224   }
225 
226   private void updateResultsMetrics(Result[] rrs) {
227     if (this.scanMetrics == null || rrs == null) {
228       return;
229     }
230     for (Result rr : rrs) {
231       this.scanMetrics.countOfBytesInResults.inc(rr.getBytes().getLength());
232       if (isRegionServerRemote) {
233         this.scanMetrics.countOfBytesInRemoteResults.inc(
234           rr.getBytes().getLength());
235       }
236     }
237   }
238 
239   private void close() {
240     if (this.scannerId == -1L) {
241       return;
242     }
243     try {
244       incRPCcallsMetrics();
245       this.server.close(this.scannerId);
246     } catch (IOException e) {
247       LOG.warn("Ignore, probably already closed", e);
248     }
249     this.scannerId = -1L;
250   }
251 
252   protected long openScanner() throws IOException {
253     incRPCcallsMetrics();
254     long id = this.server.openScanner(this.location.getRegionInfo().getRegionName(),
255        this.scan);
256     if (logScannerActivity) {
257       LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
258         + " on region " + this.location.toString() + " ip:"
259         + this.location.getServerAddress().getBindAddress());
260     }
261     return id;
262   }
263 
264   protected Scan getScan() {
265     return scan;
266   }
267 
268   /**
269    * Call this when the next invocation of call should close the scanner
270    */
271   public void setClose() {
272     this.closed = true;
273   }
274 
275   /**
276    * @return the HRegionInfo for the current region
277    */
278   public HRegionInfo getHRegionInfo() {
279     if (!instantiated) {
280       return null;
281     }
282     return location.getRegionInfo();
283   }
284 
285   /**
286    * Get the number of rows that will be fetched on next
287    * @return the number of rows for caching
288    */
289   public int getCaching() {
290     return caching;
291   }
292 
293   /**
294    * Set the number of rows that will be fetched on next
295    * @param caching the number of rows for caching
296    */
297   public void setCaching(int caching) {
298     this.caching = caching;
299   }
300 }