001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
025
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.DoNotRetryIOException;
030import org.apache.hadoop.hbase.HBaseIOException;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HRegionInfo;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.NotServingRegionException;
035import org.apache.hadoop.hbase.RegionLocations;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.TableNotEnabledException;
039import org.apache.hadoop.hbase.UnknownScannerException;
040import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
041import org.apache.hadoop.hbase.exceptions.ScannerResetException;
042import org.apache.hadoop.hbase.ipc.HBaseRpcController;
043import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
044import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
051import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
052import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
055
056/**
057 * Scanner operations such as create, next, etc. Used by {@link ResultScanner}s made by
058 * {@link Table}. Passed to a retrying caller such as {@link RpcRetryingCaller} so fails are
059 * retried.
060 */
061@InterfaceAudience.Private
062public class ScannerCallable extends ClientServiceCallable<Result[]> {
063  public static final String LOG_SCANNER_LATENCY_CUTOFF = "hbase.client.log.scanner.latency.cutoff";
064  public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
065
066  // Keeping LOG public as it is being used in TestScannerHeartbeatMessages
067  public static final Logger LOG = LoggerFactory.getLogger(ScannerCallable.class);
068  protected long scannerId = -1L;
069  protected boolean instantiated = false;
070  protected boolean closed = false;
071  protected boolean renew = false;
072  protected final Scan scan;
073  private int caching = 1;
074  protected ScanMetrics scanMetrics;
075  private boolean logScannerActivity = false;
076  private int logCutOffLatency = 1000;
077  protected final int id;
078
079  enum MoreResults {
080    YES,
081    NO,
082    UNKNOWN
083  }
084
085  private MoreResults moreResultsInRegion;
086  private MoreResults moreResultsForScan;
087
088  /**
089   * Saves whether or not the most recent response from the server was a heartbeat message.
090   * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
091   */
092  protected boolean heartbeatMessage = false;
093
094  protected Cursor cursor;
095
096  // indicate if it is a remote server call
097  protected boolean isRegionServerRemote = true;
098  private long nextCallSeq = 0;
099  protected final RpcControllerFactory rpcControllerFactory;
100
101  /**
102   * @param connection           which connection
103   * @param tableName            table callable is on
104   * @param scan                 the scan to execute
105   * @param scanMetrics          the ScanMetrics to used, if it is null, ScannerCallable won't
106   *                             collect metrics
107   * @param rpcControllerFactory factory to use when creating
108   *                             {@link com.google.protobuf.RpcController}
109   */
110  public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
111    ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
112    super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(),
113      scan.getPriority());
114    this.id = id;
115    this.scan = scan;
116    this.scanMetrics = scanMetrics;
117    Configuration conf = connection.getConfiguration();
118    logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
119    logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
120    this.rpcControllerFactory = rpcControllerFactory;
121  }
122
123  protected final HRegionLocation getLocationForReplica(RegionLocations locs)
124    throws HBaseIOException {
125    HRegionLocation loc = id < locs.size() ? locs.getRegionLocation(id) : null;
126    if (loc == null || loc.getServerName() == null) {
127      // With this exception, there will be a retry. The location can be null for a replica
128      // when the table is created or after a split.
129      throw new HBaseIOException("There is no location for replica id #" + id);
130    }
131    return loc;
132  }
133
134  /**
135   * Fetch region locations for the row. Since this is for prepare, we always useCache. This is
136   * because we can be sure that RpcRetryingCaller will have cleared the cache in error handling if
137   * this is a retry.
138   * @param row the row to look up region location for
139   */
140  protected final RegionLocations getRegionLocationsForPrepare(byte[] row) throws IOException {
141    // always use cache, because cache will have been cleared if necessary
142    // in the try/catch before retrying
143    return RpcRetryingCallerWithReadReplicas.getRegionLocations(true, id, getConnection(),
144      getTableName(), row);
145  }
146
147  /**
148   * @param reload force reload of server location
149   */
150  @Override
151  public void prepare(boolean reload) throws IOException {
152    if (Thread.interrupted()) {
153      throw new InterruptedIOException();
154    }
155
156    if (
157      reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
158        && getConnection().isTableDisabled(getTableName())
159    ) {
160      throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
161    }
162
163    RegionLocations rl = getRegionLocationsForPrepare(getRow());
164    location = getLocationForReplica(rl);
165    ServerName dest = location.getServerName();
166    setStub(super.getConnection().getClient(dest));
167    if (!instantiated || reload) {
168      checkIfRegionServerIsRemote();
169      instantiated = true;
170    }
171    cursor = null;
172    // check how often we retry.
173    if (reload) {
174      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
175    }
176  }
177
178  /**
179   * compare the local machine hostname with region server's hostname to decide if hbase client
180   * connects to a remote region server
181   */
182  protected void checkIfRegionServerIsRemote() {
183    isRegionServerRemote = isRemote(getLocation().getHostname());
184  }
185
186  private ScanResponse next() throws IOException {
187    // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
188    setHeartbeatMessage(false);
189    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
190    ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
191      this.scanMetrics != null, renew, scan.getLimit());
192    try {
193      ScanResponse response = getStub().scan(getRpcController(), request);
194      nextCallSeq++;
195      return response;
196    } catch (Exception e) {
197      IOException ioe = ProtobufUtil.handleRemoteException(e);
198      if (logScannerActivity) {
199        LOG.info(
200          "Got exception making request " + ProtobufUtil.toText(request) + " to " + getLocation(),
201          e);
202      }
203      if (logScannerActivity) {
204        if (ioe instanceof UnknownScannerException) {
205          try {
206            HRegionLocation location =
207              getConnection().relocateRegion(getTableName(), scan.getStartRow());
208            LOG.info("Scanner=" + scannerId + " expired, current region location is "
209              + location.toString());
210          } catch (Throwable t) {
211            LOG.info("Failed to relocate region", t);
212          }
213        } else if (ioe instanceof ScannerResetException) {
214          LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
215            + "asked us to reset the scanner state.", ioe);
216        }
217      }
218      // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
219      // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want
220      // ServerCallable#withRetries to just retry when it gets these exceptions. In here in
221      // a scan when doing a next in particular, we want to break out and get the scanner to
222      // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly,
223      // yeah and hard to follow and in need of a refactor).
224      if (ioe instanceof NotServingRegionException) {
225        // Throw a DNRE so that we break out of cycle of calling NSRE
226        // when what we need is to open scanner against new location.
227        // Attach NSRE to signal client that it needs to re-setup scanner.
228        if (this.scanMetrics != null) {
229          this.scanMetrics.countOfNSRE.incrementAndGet();
230        }
231        throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
232      } else if (ioe instanceof RegionServerStoppedException) {
233        // Throw a DNRE so that we break out of cycle of the retries and instead go and
234        // open scanner against new location.
235        throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
236      } else {
237        // The outer layers will retry
238        throw ioe;
239      }
240    }
241  }
242
243  private void setAlreadyClosed() {
244    this.scannerId = -1L;
245    this.closed = true;
246  }
247
248  @Override
249  protected Result[] rpcCall() throws Exception {
250    if (Thread.interrupted()) {
251      throw new InterruptedIOException();
252    }
253    if (closed) {
254      close();
255      return null;
256    }
257    ScanResponse response;
258    if (this.scannerId == -1L) {
259      response = openScanner();
260    } else {
261      response = next();
262    }
263    long timestamp = EnvironmentEdgeManager.currentTime();
264    boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage();
265    setHeartbeatMessage(isHeartBeat);
266    if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) {
267      cursor = ProtobufUtil.toCursor(response.getCursor());
268    }
269    Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
270    if (logScannerActivity) {
271      long now = EnvironmentEdgeManager.currentTime();
272      if (now - timestamp > logCutOffLatency) {
273        int rows = rrs == null ? 0 : rrs.length;
274        LOG.info(
275          "Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId);
276      }
277    }
278    updateServerSideMetrics(scanMetrics, response);
279    // moreResults is only used for the case where a filter exhausts all elements
280    if (response.hasMoreResults()) {
281      if (response.getMoreResults()) {
282        setMoreResultsForScan(MoreResults.YES);
283      } else {
284        setMoreResultsForScan(MoreResults.NO);
285        setAlreadyClosed();
286      }
287    } else {
288      setMoreResultsForScan(MoreResults.UNKNOWN);
289    }
290    if (response.hasMoreResultsInRegion()) {
291      if (response.getMoreResultsInRegion()) {
292        setMoreResultsInRegion(MoreResults.YES);
293      } else {
294        setMoreResultsInRegion(MoreResults.NO);
295        setAlreadyClosed();
296      }
297    } else {
298      setMoreResultsInRegion(MoreResults.UNKNOWN);
299    }
300    updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote);
301    return rrs;
302  }
303
304  /**
305   * @return true when the most recent RPC response indicated that the response was a heartbeat
306   *         message. Heartbeat messages are sent back from the server when the processing of the
307   *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
308   *         timeouts during long running scan operations.
309   */
310  boolean isHeartbeatMessage() {
311    return heartbeatMessage;
312  }
313
314  public Cursor getCursor() {
315    return cursor;
316  }
317
318  private void setHeartbeatMessage(boolean heartbeatMessage) {
319    this.heartbeatMessage = heartbeatMessage;
320  }
321
322  private void close() {
323    if (this.scannerId == -1L) {
324      return;
325    }
326    try {
327      incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
328      ScanRequest request =
329        RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
330      HBaseRpcController controller = rpcControllerFactory.newController();
331
332      // Set fields from the original controller onto the close-specific controller
333      // We set the timeout and the priority -- we will overwrite the priority to HIGH
334      // below, but the controller will take whichever is highest.
335      if (getRpcController() instanceof HBaseRpcController) {
336        HBaseRpcController original = (HBaseRpcController) getRpcController();
337        controller.setPriority(original.getPriority());
338        if (original.hasCallTimeout()) {
339          controller.setCallTimeout(original.getCallTimeout());
340        }
341      }
342      controller.setPriority(HConstants.HIGH_QOS);
343
344      try {
345        getStub().scan(controller, request);
346      } catch (Exception e) {
347        throw ProtobufUtil.handleRemoteException(e);
348      }
349    } catch (IOException e) {
350      TableName table = getTableName();
351      String tableDetails = (table == null) ? "" : (" on table: " + table.getNameAsString());
352      LOG.warn(
353        "Ignore, probably already closed. Current scan: " + getScan().toString() + tableDetails, e);
354    }
355    this.scannerId = -1L;
356  }
357
358  private ScanResponse openScanner() throws IOException {
359    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
360    ScanRequest request = RequestConverter.buildScanRequest(
361      getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
362    try {
363      ScanResponse response = getStub().scan(getRpcController(), request);
364      long id = response.getScannerId();
365      if (logScannerActivity) {
366        LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + " on region "
367          + getLocation().toString());
368      }
369      if (response.hasMvccReadPoint()) {
370        this.scan.setMvccReadPoint(response.getMvccReadPoint());
371      }
372      this.scannerId = id;
373      return response;
374    } catch (Exception e) {
375      throw ProtobufUtil.handleRemoteException(e);
376    }
377  }
378
379  protected Scan getScan() {
380    return scan;
381  }
382
383  /**
384   * Call this when the next invocation of call should close the scanner
385   */
386  public void setClose() {
387    this.closed = true;
388  }
389
390  /**
391   * Indicate whether we make a call only to renew the lease, but without affected the scanner in
392   * any other way.
393   * @param val true if only the lease should be renewed
394   */
395  public void setRenew(boolean val) {
396    this.renew = val;
397  }
398
399  /** Returns the HRegionInfo for the current region */
400  @Override
401  public HRegionInfo getHRegionInfo() {
402    if (!instantiated) {
403      return null;
404    }
405    return getLocation().getRegionInfo();
406  }
407
408  /**
409   * Get the number of rows that will be fetched on next
410   * @return the number of rows for caching
411   */
412  public int getCaching() {
413    return caching;
414  }
415
416  /**
417   * Set the number of rows that will be fetched on next
418   * @param caching the number of rows for caching
419   */
420  public void setCaching(int caching) {
421    this.caching = caching;
422  }
423
424  public ScannerCallable getScannerCallableForReplica(int id) {
425    ScannerCallable s = new ScannerCallable(this.getConnection(), getTableName(), this.getScan(),
426      this.scanMetrics, this.rpcControllerFactory, id);
427    s.setCaching(this.caching);
428    return s;
429  }
430
431  /**
432   * Should the client attempt to fetch more results from this region
433   */
434  MoreResults moreResultsInRegion() {
435    return moreResultsInRegion;
436  }
437
438  void setMoreResultsInRegion(MoreResults moreResults) {
439    this.moreResultsInRegion = moreResults;
440  }
441
442  /**
443   * Should the client attempt to fetch more results for the whole scan.
444   */
445  MoreResults moreResultsForScan() {
446    return moreResultsForScan;
447  }
448
449  void setMoreResultsForScan(MoreResults moreResults) {
450    this.moreResultsForScan = moreResults;
451  }
452}