001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.List;
029
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HRegionLocation;
033import org.apache.hadoop.hbase.RegionLocations;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
037import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
038import org.apache.hadoop.hbase.util.Bytes;
039
040
041/**
042 * A reversed ScannerCallable which supports backward scanning.
043 */
044@InterfaceAudience.Private
045public class ReversedScannerCallable extends ScannerCallable {
046
047  /**
048   * @param connection
049   * @param tableName
050   * @param scan
051   * @param scanMetrics
052   * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the
053   *          regionserver
054   */
055  public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
056      ScanMetrics scanMetrics, RpcControllerFactory rpcFactory) {
057    super(connection, tableName, scan, scanMetrics, rpcFactory);
058  }
059
060  /**
061   * @param connection
062   * @param tableName
063   * @param scan
064   * @param scanMetrics
065   * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the
066   *          regionserver
067   * @param replicaId the replica id
068   */
069  public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
070      ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) {
071    super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
072  }
073
074  /**
075   * @param reload force reload of server location
076   * @throws IOException
077   */
078  @Override
079  public void prepare(boolean reload) throws IOException {
080    if (Thread.interrupted()) {
081      throw new InterruptedIOException();
082    }
083    if (!instantiated || reload) {
084      // we should use range locate if
085      // 1. we do not want the start row
086      // 2. the start row is empty which means we need to locate to the last region.
087      if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
088        // Just locate the region with the row
089        RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id,
090            getConnection(), getTableName(), getRow());
091        this.location = id < rl.size() ? rl.getRegionLocation(id) : null;
092        if (location == null || location.getServerName() == null) {
093          throw new IOException("Failed to find location, tableName="
094              + getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
095              + reload);
096        }
097      } else {
098        // Need to locate the regions with the range, and the target location is
099        // the last one which is the previous region of last region scanner
100        byte[] locateStartRow = createCloseRowBefore(getRow());
101        List<HRegionLocation> locatedRegions = locateRegionsInRange(
102            locateStartRow, getRow(), reload);
103        if (locatedRegions.isEmpty()) {
104          throw new DoNotRetryIOException(
105              "Does hbase:meta exist hole? Couldn't get regions for the range from "
106                  + Bytes.toStringBinary(locateStartRow) + " to "
107                  + Bytes.toStringBinary(getRow()));
108        }
109        this.location = locatedRegions.get(locatedRegions.size() - 1);
110      }
111      setStub(getConnection().getClient(getLocation().getServerName()));
112      checkIfRegionServerIsRemote();
113      instantiated = true;
114    }
115
116    // check how often we retry.
117    if (reload) {
118      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
119    }
120  }
121
122  /**
123   * Get the corresponding regions for an arbitrary range of keys.
124   * @param startKey Starting row in range, inclusive
125   * @param endKey Ending row in range, exclusive
126   * @param reload force reload of server location
127   * @return A list of HRegionLocation corresponding to the regions that contain
128   *         the specified range
129   * @throws IOException
130   */
131  private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
132      byte[] endKey, boolean reload) throws IOException {
133    final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
134        HConstants.EMPTY_END_ROW);
135    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
136      throw new IllegalArgumentException("Invalid range: "
137          + Bytes.toStringBinary(startKey) + " > "
138          + Bytes.toStringBinary(endKey));
139    }
140    List<HRegionLocation> regionList = new ArrayList<>();
141    byte[] currentKey = startKey;
142    do {
143      RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id,
144          getConnection(), getTableName(), currentKey);
145      HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null;
146      if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) {
147        regionList.add(regionLocation);
148      } else {
149        throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
150            + Bytes.toStringBinary(currentKey) + " returns incorrect region "
151            + (regionLocation == null ? null : regionLocation.getRegionInfo()));
152      }
153      currentKey = regionLocation.getRegionInfo().getEndKey();
154    } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
155        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
156    return regionList;
157  }
158
159  @Override
160  public ScannerCallable getScannerCallableForReplica(int id) {
161    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(),
162        this.getScan(), this.scanMetrics, rpcControllerFactory, id);
163    r.setCaching(this.getCaching());
164    return r;
165  }
166}