001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
026
027import java.io.IOException;
028import java.io.InterruptedIOException;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.DoNotRetryIOException;
032import org.apache.hadoop.hbase.HBaseIOException;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.NotServingRegionException;
036import org.apache.hadoop.hbase.RegionLocations;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.UnknownScannerException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
044import org.apache.hadoop.hbase.exceptions.ScannerResetException;
045import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
046import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
049import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
052
053/**
054 * Scanner operations such as create, next, etc.
055 * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
056 * {@link RpcRetryingCaller} so fails are retried.
057 */
058@InterfaceAudience.Private
059public class ScannerCallable extends ClientServiceCallable<Result[]> {
060  public static final String LOG_SCANNER_LATENCY_CUTOFF
061    = "hbase.client.log.scanner.latency.cutoff";
062  public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
063
064  // Keeping LOG public as it is being used in TestScannerHeartbeatMessages
065  public static final Logger LOG = LoggerFactory.getLogger(ScannerCallable.class);
066  protected long scannerId = -1L;
067  protected boolean instantiated = false;
068  protected boolean closed = false;
069  protected boolean renew = false;
070  protected final Scan scan;
071  private int caching = 1;
072  protected ScanMetrics scanMetrics;
073  private boolean logScannerActivity = false;
074  private int logCutOffLatency = 1000;
075  protected final int id;
076
077  enum MoreResults {
078    YES, NO, UNKNOWN
079  }
080
081  private MoreResults moreResultsInRegion;
082  private MoreResults moreResultsForScan;
083
084  /**
085   * Saves whether or not the most recent response from the server was a heartbeat message.
086   * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
087   */
088  protected boolean heartbeatMessage = false;
089
090  protected Cursor cursor;
091
092  // indicate if it is a remote server call
093  protected boolean isRegionServerRemote = true;
094  private long nextCallSeq = 0;
095  protected final RpcControllerFactory rpcControllerFactory;
096
097  /**
098   * @param connection which connection
099   * @param tableName table callable is on
100   * @param scan the scan to execute
101   * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
102   *          metrics
103   * @param rpcControllerFactory factory to use when creating
104   *        {@link com.google.protobuf.RpcController}
105   */
106  public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
107      ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
108    this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
109  }
110  /**
111   *
112   * @param connection
113   * @param tableName
114   * @param scan
115   * @param scanMetrics
116   * @param id the replicaId
117   */
118  public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
119      ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
120    super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority());
121    this.id = id;
122    this.scan = scan;
123    this.scanMetrics = scanMetrics;
124    Configuration conf = connection.getConfiguration();
125    logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
126    logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
127    this.rpcControllerFactory = rpcControllerFactory;
128  }
129
130  /**
131   * @param reload force reload of server location
132   * @throws IOException
133   */
134  @Override
135  public void prepare(boolean reload) throws IOException {
136    if (Thread.interrupted()) {
137      throw new InterruptedIOException();
138    }
139    RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
140        id, getConnection(), getTableName(), getRow());
141    location = id < rl.size() ? rl.getRegionLocation(id) : null;
142    if (location == null || location.getServerName() == null) {
143      // With this exception, there will be a retry. The location can be null for a replica
144      //  when the table is created or after a split.
145      throw new HBaseIOException("There is no location for replica id #" + id);
146    }
147    ServerName dest = location.getServerName();
148    setStub(super.getConnection().getClient(dest));
149    if (!instantiated || reload) {
150      checkIfRegionServerIsRemote();
151      instantiated = true;
152    }
153    cursor = null;
154    // check how often we retry.
155    if (reload) {
156      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
157    }
158  }
159
160  /**
161   * compare the local machine hostname with region server's hostname to decide if hbase client
162   * connects to a remote region server
163   */
164  protected void checkIfRegionServerIsRemote() {
165    isRegionServerRemote = isRemote(getLocation().getHostname());
166  }
167
168  private ScanResponse next() throws IOException {
169    // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
170    setHeartbeatMessage(false);
171    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
172    ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
173      this.scanMetrics != null, renew, scan.getLimit());
174    try {
175      ScanResponse response = getStub().scan(getRpcController(), request);
176      nextCallSeq++;
177      return response;
178    } catch (Exception e) {
179      IOException ioe = ProtobufUtil.handleRemoteException(e);
180      if (logScannerActivity) {
181        LOG.info(
182          "Got exception making request " + ProtobufUtil.toText(request) + " to " + getLocation(),
183          e);
184      }
185      if (logScannerActivity) {
186        if (ioe instanceof UnknownScannerException) {
187          try {
188            HRegionLocation location =
189                getConnection().relocateRegion(getTableName(), scan.getStartRow());
190            LOG.info("Scanner=" + scannerId + " expired, current region location is "
191                + location.toString());
192          } catch (Throwable t) {
193            LOG.info("Failed to relocate region", t);
194          }
195        } else if (ioe instanceof ScannerResetException) {
196          LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
197              + "asked us to reset the scanner state.",
198            ioe);
199        }
200      }
201      // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
202      // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want
203      // ServerCallable#withRetries to just retry when it gets these exceptions. In here in
204      // a scan when doing a next in particular, we want to break out and get the scanner to
205      // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly,
206      // yeah and hard to follow and in need of a refactor).
207      if (ioe instanceof NotServingRegionException) {
208        // Throw a DNRE so that we break out of cycle of calling NSRE
209        // when what we need is to open scanner against new location.
210        // Attach NSRE to signal client that it needs to re-setup scanner.
211        if (this.scanMetrics != null) {
212          this.scanMetrics.countOfNSRE.incrementAndGet();
213        }
214        throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
215      } else if (ioe instanceof RegionServerStoppedException) {
216        // Throw a DNRE so that we break out of cycle of the retries and instead go and
217        // open scanner against new location.
218        throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
219      } else {
220        // The outer layers will retry
221        throw ioe;
222      }
223    }
224  }
225
226  private void setAlreadyClosed() {
227    this.scannerId = -1L;
228    this.closed = true;
229  }
230
231  @Override
232  protected Result[] rpcCall() throws Exception {
233    if (Thread.interrupted()) {
234      throw new InterruptedIOException();
235    }
236    if (closed) {
237      close();
238      return null;
239    }
240    ScanResponse response;
241    if (this.scannerId == -1L) {
242      response = openScanner();
243    } else {
244      response = next();
245    }
246    long timestamp = System.currentTimeMillis();
247    boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage();
248    setHeartbeatMessage(isHeartBeat);
249    if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) {
250      cursor = ProtobufUtil.toCursor(response.getCursor());
251    }
252    Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
253    if (logScannerActivity) {
254      long now = System.currentTimeMillis();
255      if (now - timestamp > logCutOffLatency) {
256        int rows = rrs == null ? 0 : rrs.length;
257        LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner="
258            + scannerId);
259      }
260    }
261    updateServerSideMetrics(scanMetrics, response);
262    // moreResults is only used for the case where a filter exhausts all elements
263    if (response.hasMoreResults()) {
264      if (response.getMoreResults()) {
265        setMoreResultsForScan(MoreResults.YES);
266      } else {
267        setMoreResultsForScan(MoreResults.NO);
268        setAlreadyClosed();
269      }
270    } else {
271      setMoreResultsForScan(MoreResults.UNKNOWN);
272    }
273    if (response.hasMoreResultsInRegion()) {
274      if (response.getMoreResultsInRegion()) {
275        setMoreResultsInRegion(MoreResults.YES);
276      } else {
277        setMoreResultsInRegion(MoreResults.NO);
278        setAlreadyClosed();
279      }
280    } else {
281      setMoreResultsInRegion(MoreResults.UNKNOWN);
282    }
283    updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote);
284    return rrs;
285  }
286
287  /**
288   * @return true when the most recent RPC response indicated that the response was a heartbeat
289   *         message. Heartbeat messages are sent back from the server when the processing of the
290   *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
291   *         timeouts during long running scan operations.
292   */
293  boolean isHeartbeatMessage() {
294    return heartbeatMessage;
295  }
296
297  public Cursor getCursor() {
298    return cursor;
299  }
300
301  private void setHeartbeatMessage(boolean heartbeatMessage) {
302    this.heartbeatMessage = heartbeatMessage;
303  }
304
305  private void close() {
306    if (this.scannerId == -1L) {
307      return;
308    }
309    try {
310      incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
311      ScanRequest request =
312          RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
313      try {
314        getStub().scan(getRpcController(), request);
315      } catch (Exception e) {
316        throw ProtobufUtil.handleRemoteException(e);
317      }
318    } catch (IOException e) {
319      TableName table = getTableName();
320      String tableDetails = (table == null) ? "" : (" on table: " + table.getNameAsString());
321      LOG.warn("Ignore, probably already closed. Current scan: " + getScan().toString()
322          + tableDetails, e);
323    }
324    this.scannerId = -1L;
325  }
326
327  private ScanResponse openScanner() throws IOException {
328    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
329    ScanRequest request = RequestConverter.buildScanRequest(
330      getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
331    try {
332      ScanResponse response = getStub().scan(getRpcController(), request);
333      long id = response.getScannerId();
334      if (logScannerActivity) {
335        LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
336          + " on region " + getLocation().toString());
337      }
338      if (response.hasMvccReadPoint()) {
339        this.scan.setMvccReadPoint(response.getMvccReadPoint());
340      }
341      this.scannerId = id;
342      return response;
343    } catch (Exception e) {
344      throw ProtobufUtil.handleRemoteException(e);
345    }
346  }
347
348  protected Scan getScan() {
349    return scan;
350  }
351
352  /**
353   * Call this when the next invocation of call should close the scanner
354   */
355  public void setClose() {
356    this.closed = true;
357  }
358
359  /**
360   * Indicate whether we make a call only to renew the lease, but without affected the scanner in
361   * any other way.
362   * @param val true if only the lease should be renewed
363   */
364  public void setRenew(boolean val) {
365    this.renew = val;
366  }
367
368  /**
369   * @return the HRegionInfo for the current region
370   */
371  @Override
372  public HRegionInfo getHRegionInfo() {
373    if (!instantiated) {
374      return null;
375    }
376    return getLocation().getRegionInfo();
377  }
378
379  /**
380   * Get the number of rows that will be fetched on next
381   * @return the number of rows for caching
382   */
383  public int getCaching() {
384    return caching;
385  }
386
387  /**
388   * Set the number of rows that will be fetched on next
389   * @param caching the number of rows for caching
390   */
391  public void setCaching(int caching) {
392    this.caching = caching;
393  }
394
395  public ScannerCallable getScannerCallableForReplica(int id) {
396    ScannerCallable s = new ScannerCallable(this.getConnection(), getTableName(),
397        this.getScan(), this.scanMetrics, this.rpcControllerFactory, id);
398    s.setCaching(this.caching);
399    return s;
400  }
401
402  /**
403   * Should the client attempt to fetch more results from this region
404   */
405  MoreResults moreResultsInRegion() {
406    return moreResultsInRegion;
407  }
408
409  void setMoreResultsInRegion(MoreResults moreResults) {
410    this.moreResultsInRegion = moreResults;
411  }
412
413  /**
414   * Should the client attempt to fetch more results for the whole scan.
415   */
416  MoreResults moreResultsForScan() {
417    return moreResultsForScan;
418  }
419
420  void setMoreResultsForScan(MoreResults moreResults) {
421    this.moreResultsForScan = moreResults;
422  }
423}