View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.DoNotRetryIOException;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.HRegionLocation;
31  import org.apache.hadoop.hbase.RegionLocations;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
36  import org.apache.hadoop.hbase.util.Bytes;
37  
38  
39  /**
40   * A reversed ScannerCallable which supports backward scanning.
41   */
42  @InterfaceAudience.Private
43  public class ReversedScannerCallable extends ScannerCallable {
44    /**
45     * The start row for locating regions. In reversed scanner, may locate the
46     * regions for a range of keys when doing
47     * {@link ReversedClientScanner#nextScanner(int, boolean)}
48     */
49    protected final byte[] locateStartRow;
50  
51    /**
52     * @param connection
53     * @param tableName
54     * @param scan
55     * @param scanMetrics
56     * @param locateStartRow The start row for locating regions
57     * @param rpcFactory to create an 
58     * {@link com.google.protobuf.RpcController} to talk to the regionserver
59     */
60    public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
61        ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory) {
62      super(connection, tableName, scan, scanMetrics, rpcFactory);
63      this.locateStartRow = locateStartRow;
64    }
65  
66    /**
67     * @param connection
68     * @param tableName
69     * @param scan
70     * @param scanMetrics
71     * @param locateStartRow The start row for locating regions
72     * @param rpcFactory to create an 
73     * {@link com.google.protobuf.RpcController} to talk to the regionserver
74     * @param replicaId the replica id
75     */
76    public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
77        ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory, int replicaId) {
78      super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
79      this.locateStartRow = locateStartRow;
80    }
81  
82    /**
83     * @deprecated use
84     *             {@link #ReversedScannerCallable(ClusterConnection, TableName, Scan, ScanMetrics, byte[], RpcControllerFactory )}
85     */
86    @Deprecated
87    public ReversedScannerCallable(ClusterConnection connection, TableName tableName,
88        Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
89      this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory
90          .instantiate(connection.getConfiguration()));
91    }
92  
93    /**
94     * @param reload force reload of server location
95     * @throws IOException
96     */
97    @Override
98    public void prepare(boolean reload) throws IOException {
99      if (Thread.interrupted()) {
100       throw new InterruptedIOException();
101     }
102     if (!instantiated || reload) {
103       if (locateStartRow == null) {
104         // Just locate the region with the row
105         RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id,
106             getConnection(), tableName, row);
107         this.location = id < rl.size() ? rl.getRegionLocation(id) : null;
108         if (this.location == null) {
109           throw new IOException("Failed to find location, tableName="
110               + tableName + ", row=" + Bytes.toStringBinary(row) + ", reload="
111               + reload);
112         }
113       } else {
114         // Need to locate the regions with the range, and the target location is
115         // the last one which is the previous region of last region scanner
116         List<HRegionLocation> locatedRegions = locateRegionsInRange(
117             locateStartRow, row, reload);
118         if (locatedRegions.isEmpty()) {
119           throw new DoNotRetryIOException(
120               "Does hbase:meta exist hole? Couldn't get regions for the range from "
121                   + Bytes.toStringBinary(locateStartRow) + " to "
122                   + Bytes.toStringBinary(row));
123         }
124         this.location = locatedRegions.get(locatedRegions.size() - 1);
125       }
126       setStub(getConnection().getClient(getLocation().getServerName()));
127       checkIfRegionServerIsRemote();
128       instantiated = true;
129     }
130 
131     // check how often we retry.
132     // HConnectionManager will call instantiateServer with reload==true
133     // if and only if for retries.
134     if (reload && this.scanMetrics != null) {
135       this.scanMetrics.countOfRPCRetries.incrementAndGet();
136       if (isRegionServerRemote) {
137         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
138       }
139     }
140   }
141 
142   /**
143    * Get the corresponding regions for an arbitrary range of keys.
144    * @param startKey Starting row in range, inclusive
145    * @param endKey Ending row in range, exclusive
146    * @param reload force reload of server location
147    * @return A list of HRegionLocation corresponding to the regions that contain
148    *         the specified range
149    * @throws IOException
150    */
151   private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
152       byte[] endKey, boolean reload) throws IOException {
153     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
154         HConstants.EMPTY_END_ROW);
155     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
156       throw new IllegalArgumentException("Invalid range: "
157           + Bytes.toStringBinary(startKey) + " > "
158           + Bytes.toStringBinary(endKey));
159     }
160     List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();
161     byte[] currentKey = startKey;
162     do {
163       RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id,
164           getConnection(), tableName, currentKey);
165       HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null;
166       if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) {
167         regionList.add(regionLocation);
168       } else {
169         throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
170             + Bytes.toStringBinary(currentKey) + " returns incorrect region "
171             + regionLocation.getRegionInfo());
172       }
173       currentKey = regionLocation.getRegionInfo().getEndKey();
174     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
175         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
176     return regionList;
177   }
178 
179   @Override
180   public ScannerCallable getScannerCallableForReplica(int id) {
181     ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName,
182         this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id);
183     r.setCaching(this.getCaching());
184     return r;
185   }
186 }