001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.List;
029
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HRegionLocation;
033import org.apache.hadoop.hbase.RegionLocations;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
037import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
038import org.apache.hadoop.hbase.util.Bytes;
039
040
041/**
042 * A reversed ScannerCallable which supports backward scanning.
043 */
044@InterfaceAudience.Private
045public class ReversedScannerCallable extends ScannerCallable {
046
047  /**
048   * @param connection which connection
049   * @param tableName table callable is on
050   * @param scan the scan to execute
051   * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
052   *          metrics
053   * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the
054   *          regionserver
055   * @param replicaId the replica id
056   */
057  public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
058    ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) {
059    super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
060  }
061
062  /**
063   * @param reload force reload of server location
064   */
065  @Override
066  public void prepare(boolean reload) throws IOException {
067    if (Thread.interrupted()) {
068      throw new InterruptedIOException();
069    }
070    if (!instantiated || reload) {
071      // we should use range locate if
072      // 1. we do not want the start row
073      // 2. the start row is empty which means we need to locate to the last region.
074      if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
075        // Just locate the region with the row
076        RegionLocations rl = getRegionLocations(reload, getRow());
077        this.location = getLocationForReplica(rl);
078        if (location == null || location.getServerName() == null) {
079          throw new IOException("Failed to find location, tableName="
080              + getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
081              + reload);
082        }
083      } else {
084        // Need to locate the regions with the range, and the target location is
085        // the last one which is the previous region of last region scanner
086        byte[] locateStartRow = createCloseRowBefore(getRow());
087        List<HRegionLocation> locatedRegions = locateRegionsInRange(
088            locateStartRow, getRow(), reload);
089        if (locatedRegions.isEmpty()) {
090          throw new DoNotRetryIOException(
091              "Does hbase:meta exist hole? Couldn't get regions for the range from "
092                  + Bytes.toStringBinary(locateStartRow) + " to "
093                  + Bytes.toStringBinary(getRow()));
094        }
095        this.location = locatedRegions.get(locatedRegions.size() - 1);
096      }
097      setStub(getConnection().getClient(getLocation().getServerName()));
098      checkIfRegionServerIsRemote();
099      instantiated = true;
100    }
101
102    // check how often we retry.
103    if (reload) {
104      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
105    }
106  }
107
108  /**
109   * Get the corresponding regions for an arbitrary range of keys.
110   * @param startKey Starting row in range, inclusive
111   * @param endKey Ending row in range, exclusive
112   * @param reload force reload of server location
113   * @return A list of HRegionLocation corresponding to the regions that contain
114   *         the specified range
115   */
116  private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
117      byte[] endKey, boolean reload) throws IOException {
118    final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
119        HConstants.EMPTY_END_ROW);
120    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
121      throw new IllegalArgumentException("Invalid range: "
122          + Bytes.toStringBinary(startKey) + " > "
123          + Bytes.toStringBinary(endKey));
124    }
125    List<HRegionLocation> regionList = new ArrayList<>();
126    byte[] currentKey = startKey;
127    do {
128      RegionLocations rl = getRegionLocations(reload, currentKey);
129      HRegionLocation regionLocation = getLocationForReplica(rl);
130      if (regionLocation.getRegionInfo().containsRow(currentKey)) {
131        regionList.add(regionLocation);
132      } else {
133        throw new DoNotRetryIOException(
134          "Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) +
135            " returns incorrect region " + regionLocation.getRegionInfo());
136      }
137      currentKey = regionLocation.getRegionInfo().getEndKey();
138    } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
139        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
140    return regionList;
141  }
142
143  @Override
144  public ScannerCallable getScannerCallableForReplica(int id) {
145    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(),
146        this.getScan(), this.scanMetrics, rpcControllerFactory, id);
147    r.setCaching(this.getCaching());
148    return r;
149  }
150}