001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.HRegionLocation;
029import org.apache.hadoop.hbase.RegionLocations;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.TableNotEnabledException;
032import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
033import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.Pair;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * A reversed ScannerCallable which supports backward scanning.
040 */
041@InterfaceAudience.Private
042public class ReversedScannerCallable extends ScannerCallable {
043
044  private byte[] locationSearchKey;
045
046  /**
047   * @param connection  which connection
048   * @param tableName   table callable is on
049   * @param scan        the scan to execute
050   * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
051   *                    metrics
052   * @param rpcFactory  to create an {@link com.google.protobuf.RpcController} to talk to the
053   *                    regionserver
054   * @param replicaId   the replica id
055   */
056  public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
057    ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) {
058    super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
059  }
060
061  @Override
062  public void throwable(Throwable t, boolean retrying) {
063    // for reverse scans, we need to update cache using the search key found for the reverse scan
064    // range in prepare. Otherwise, we will see weird behavior at the table boundaries,
065    // when trying to clear cache for an empty row.
066    if (location != null && locationSearchKey != null) {
067      getConnection().updateCachedLocations(getTableName(),
068        location.getRegionInfo().getRegionName(), locationSearchKey, t, location.getServerName());
069    }
070  }
071
072  /**
073   * @param reload force reload of server location
074   */
075  @Override
076  public void prepare(boolean reload) throws IOException {
077    if (Thread.interrupted()) {
078      throw new InterruptedIOException();
079    }
080
081    if (
082      reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
083        && getConnection().isTableDisabled(getTableName())
084    ) {
085      throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
086    }
087
088    if (!instantiated || reload) {
089      // we should use range locate if
090      // 1. we do not want the start row
091      // 2. the start row is empty which means we need to locate to the last region.
092      if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
093        // Just locate the region with the row
094        RegionLocations rl = getRegionLocationsForPrepare(getRow());
095        this.location = getLocationForReplica(rl);
096        this.locationSearchKey = getRow();
097      } else {
098        // The locateStart row is an approximation. So we need to search between
099        // that and the actual row in order to really find the last region
100        byte[] locateStartRow = createCloseRowBefore(getRow());
101        Pair<HRegionLocation, byte[]> lastRegionAndKey =
102          locateLastRegionInRange(locateStartRow, getRow());
103        this.location = lastRegionAndKey.getFirst();
104        this.locationSearchKey = lastRegionAndKey.getSecond();
105      }
106
107      if (location == null || location.getServerName() == null) {
108        throw new IOException("Failed to find location, tableName=" + getTableName() + ", row="
109          + Bytes.toStringBinary(getRow()) + ", reload=" + reload);
110      }
111
112      setStub(getConnection().getClient(getLocation().getServerName()));
113      checkIfRegionServerIsRemote();
114      instantiated = true;
115    }
116
117    // check how often we retry.
118    if (reload) {
119      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
120    }
121  }
122
123  /**
124   * Get the last region before the endkey, which will be used to execute the reverse scan
125   * @param startKey Starting row in range, inclusive
126   * @param endKey   Ending row in range, exclusive
127   * @return The last location, and the rowKey used to find it. May be null, if a region could not
128   *         be found.
129   */
130  private Pair<HRegionLocation, byte[]> locateLastRegionInRange(byte[] startKey, byte[] endKey)
131    throws IOException {
132    final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
133    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
134      throw new IllegalArgumentException(
135        "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
136    }
137
138    HRegionLocation lastRegion = null;
139    byte[] lastFoundKey = null;
140    byte[] currentKey = startKey;
141
142    do {
143      RegionLocations rl = getRegionLocationsForPrepare(currentKey);
144      HRegionLocation regionLocation = getLocationForReplica(rl);
145      if (regionLocation.getRegionInfo().containsRow(currentKey)) {
146        lastFoundKey = currentKey;
147        lastRegion = regionLocation;
148      } else {
149        throw new DoNotRetryIOException(
150          "Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey)
151            + " returns incorrect region " + regionLocation.getRegionInfo());
152      }
153      currentKey = regionLocation.getRegionInfo().getEndKey();
154    } while (
155      !Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
156        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0)
157    );
158
159    return new Pair<>(lastRegion, lastFoundKey);
160  }
161
162  @Override
163  public ScannerCallable getScannerCallableForReplica(int id) {
164    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(),
165      this.getScan(), this.scanMetrics, rpcControllerFactory, id);
166    r.setCaching(this.getCaching());
167    return r;
168  }
169}