View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.util.concurrent.ExecutorService;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.HConstants;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.hbase.util.ExceptionUtil;
33  
34  /**
35   * A reversed client scanner which support backward scanning
36   */
37  @InterfaceAudience.Private
38  public class ReversedClientScanner extends ClientScanner {
39    private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class);
40  
41    /**
42     * Create a new ReversibleClientScanner for the specified table Note that the
43     * passed {@link Scan}'s start row maybe changed.
44     * @param conf
45     * @param scan
46     * @param tableName
47     * @param connection
48     * @param pool
49     * @param primaryOperationTimeout
50     * @throws IOException
51     */
52    public ReversedClientScanner(Configuration conf, Scan scan,
53        TableName tableName, ClusterConnection connection,
54        RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
55        ExecutorService pool, int primaryOperationTimeout) throws IOException {
56      super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
57          primaryOperationTimeout);
58    }
59  
60    @Override
61    protected boolean nextScanner(int nbRows, final boolean done)
62        throws IOException {
63      // Close the previous scanner if it's open
64      if (this.callable != null) {
65        this.callable.setClose();
66        // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
67        // we do a callWithRetries
68        this.caller.callWithoutRetries(callable, scannerTimeout);
69        this.callable = null;
70      }
71  
72      // Where to start the next scanner
73      byte[] localStartKey;
74      boolean locateTheClosestFrontRow = true;
75      // if we're at start of table, close and return false to stop iterating
76      if (this.currentRegion != null) {
77        byte[] startKey = this.currentRegion.getStartKey();
78        if (startKey == null
79            || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
80            || checkScanStopRow(startKey) || done) {
81          close();
82          if (LOG.isDebugEnabled()) {
83            LOG.debug("Finished " + this.currentRegion);
84          }
85          return false;
86        }
87        localStartKey = startKey;
88        if (LOG.isDebugEnabled()) {
89          LOG.debug("Finished " + this.currentRegion);
90        }
91      } else {
92        localStartKey = this.scan.getStartRow();
93        if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) {
94          locateTheClosestFrontRow = false;
95        }
96      }
97  
98      if (LOG.isDebugEnabled() && this.currentRegion != null) {
99        // Only worth logging if NOT first region in scan.
100       LOG.debug("Advancing internal scanner to startKey at '"
101           + Bytes.toStringBinary(localStartKey) + "'");
102     }
103     try {
104       // In reversed scan, we want to locate the previous region through current
105       // region's start key. In order to get that previous region, first we
106       // create a closest row before the start key of current region, then
107       // locate all the regions from the created closest row to start key of
108       // current region, thus the last one of located regions should be the
109       // previous region of current region. The related logic of locating
110       // regions is implemented in ReversedScannerCallable
111       byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey)
112           : null;
113       callable = getScannerCallable(localStartKey, nbRows, locateStartRow);
114       // Open a scanner on the region server starting at the
115       // beginning of the region
116       // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
117       // we do a callWithRetries
118       this.caller.callWithoutRetries(callable, scannerTimeout);
119       this.currentRegion = callable.getHRegionInfo();
120       if (this.scanMetrics != null) {
121         this.scanMetrics.countOfRegions.incrementAndGet();
122       }
123     } catch (IOException e) {
124       ExceptionUtil.rethrowIfInterrupt(e);
125       close();
126       throw e;
127     }
128     return true;
129   }
130 
131   protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey,
132       int nbRows, byte[] locateStartRow) {
133     scan.setStartRow(localStartKey);
134     ScannerCallable s =
135         new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
136             locateStartRow, this.rpcControllerFactory);
137     s.setCaching(nbRows);
138     ScannerCallableWithReplicas sr =
139         new ScannerCallableWithReplicas(getTable(), getConnection(), s, pool,
140             primaryOperationTimeout, scan, getRetries(), getScannerTimeout(), caching, getConf(),
141             caller);
142     return sr;
143   }
144 
145   @Override
146   // returns true if stopRow >= passed region startKey
147   protected boolean checkScanStopRow(final byte[] startKey) {
148     if (this.scan.getStopRow().length > 0) {
149       // there is a stop row, check to see if we are past it.
150       byte[] stopRow = scan.getStopRow();
151       int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, startKey, 0,
152           startKey.length);
153       if (cmp >= 0) {
154         // stopRow >= startKey (stopRow is equals to or larger than endKey)
155         // This is a stop.
156         return true;
157       }
158     }
159     return false; // unlikely.
160   }
161 }