1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.util.concurrent.ExecutorService;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.hadoop.hbase.util.ExceptionUtil;
33
34
35
36
37 @InterfaceAudience.Private
38 public class ReversedClientScanner extends ClientScanner {
39 private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class);
40
41
42
43
44
45
46
47
48
49
50
51
52 public ReversedClientScanner(Configuration conf, Scan scan,
53 TableName tableName, ClusterConnection connection,
54 RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
55 ExecutorService pool, int primaryOperationTimeout) throws IOException {
56 super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
57 primaryOperationTimeout);
58 }
59
60 @Override
61 protected boolean nextScanner(int nbRows, final boolean done)
62 throws IOException {
63
64 if (this.callable != null) {
65 this.callable.setClose();
66
67
68 this.caller.callWithoutRetries(callable, scannerTimeout);
69 this.callable = null;
70 }
71
72
73 byte[] localStartKey;
74 boolean locateTheClosestFrontRow = true;
75
76 if (this.currentRegion != null) {
77 byte[] startKey = this.currentRegion.getStartKey();
78 if (startKey == null
79 || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
80 || checkScanStopRow(startKey) || done) {
81 close();
82 if (LOG.isDebugEnabled()) {
83 LOG.debug("Finished " + this.currentRegion);
84 }
85 return false;
86 }
87 localStartKey = startKey;
88 if (LOG.isDebugEnabled()) {
89 LOG.debug("Finished " + this.currentRegion);
90 }
91 } else {
92 localStartKey = this.scan.getStartRow();
93 if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) {
94 locateTheClosestFrontRow = false;
95 }
96 }
97
98 if (LOG.isDebugEnabled() && this.currentRegion != null) {
99
100 LOG.debug("Advancing internal scanner to startKey at '"
101 + Bytes.toStringBinary(localStartKey) + "'");
102 }
103 try {
104
105
106
107
108
109
110
111 byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey)
112 : null;
113 callable = getScannerCallable(localStartKey, nbRows, locateStartRow);
114
115
116
117
118 this.caller.callWithoutRetries(callable, scannerTimeout);
119 this.currentRegion = callable.getHRegionInfo();
120 if (this.scanMetrics != null) {
121 this.scanMetrics.countOfRegions.incrementAndGet();
122 }
123 } catch (IOException e) {
124 ExceptionUtil.rethrowIfInterrupt(e);
125 close();
126 throw e;
127 }
128 return true;
129 }
130
131 protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey,
132 int nbRows, byte[] locateStartRow) {
133 scan.setStartRow(localStartKey);
134 ScannerCallable s =
135 new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
136 locateStartRow, this.rpcControllerFactory);
137 s.setCaching(nbRows);
138 ScannerCallableWithReplicas sr =
139 new ScannerCallableWithReplicas(getTable(), getConnection(), s, pool,
140 primaryOperationTimeout, scan, getRetries(), getScannerTimeout(), caching, getConf(),
141 caller);
142 return sr;
143 }
144
145 @Override
146
147 protected boolean checkScanStopRow(final byte[] startKey) {
148 if (this.scan.getStopRow().length > 0) {
149
150 byte[] stopRow = scan.getStopRow();
151 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, startKey, 0,
152 startKey.length);
153 if (cmp >= 0) {
154
155
156 return true;
157 }
158 }
159 return false;
160 }
161 }