1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.CellUtil;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
32 import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
33 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
34 import org.apache.hadoop.hbase.util.Bytes;
35
36 import com.google.common.annotations.VisibleForTesting;
37
38 import java.io.IOException;
39 import java.util.concurrent.ExecutorService;
40
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class ClientSmallReversedScanner extends ReversedClientScanner {
50 private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
51 private ScannerCallableWithReplicas smallScanCallable = null;
52 private SmallScannerCallableFactory callableFactory;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
78 final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
79 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
80 throws IOException {
81 this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
82 primaryOperationTimeout, new SmallScannerCallableFactory());
83 }
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 @VisibleForTesting
111 ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
112 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
113 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
114 SmallScannerCallableFactory callableFactory) throws IOException {
115 super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
116 primaryOperationTimeout);
117 this.callableFactory = callableFactory;
118 }
119
120
121
122
123
124
125
126
127
128
129
130
131
132 private boolean nextScanner(int nbRows, final boolean done,
133 boolean currentRegionDone) throws IOException {
134
135 byte[] localStartKey;
136 int cacheNum = nbRows;
137 boolean regionChanged = true;
138
139 if (this.currentRegion != null && currentRegionDone) {
140 byte[] startKey = this.currentRegion.getStartKey();
141 if (startKey == null
142 || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
143 || checkScanStopRow(startKey) || done) {
144 close();
145 if (LOG.isDebugEnabled()) {
146 LOG.debug("Finished with small scan at " + this.currentRegion);
147 }
148 return false;
149 }
150
151 localStartKey = createClosestRowBefore(startKey);
152 if (LOG.isDebugEnabled()) {
153 LOG.debug("Finished with region " + this.currentRegion);
154 }
155 } else if (this.lastResult != null) {
156 regionChanged = false;
157 localStartKey = createClosestRowBefore(lastResult.getRow());
158 } else {
159 localStartKey = this.scan.getStartRow();
160 }
161
162 if (LOG.isTraceEnabled()) {
163 LOG.trace("Advancing internal small scanner to startKey at '"
164 + Bytes.toStringBinary(localStartKey) + "'");
165 }
166
167 smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
168 getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
169 getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
170
171 if (this.scanMetrics != null && regionChanged) {
172 this.scanMetrics.countOfRegions.incrementAndGet();
173 }
174 return true;
175 }
176
177 @Override
178 public Result next() throws IOException {
179
180
181 if (cache.size() == 0 && this.closed) {
182 return null;
183 }
184 if (cache.size() == 0) {
185 loadCache();
186 }
187
188 if (cache.size() > 0) {
189 return cache.poll();
190 }
191
192
193 writeScanMetrics();
194 return null;
195 }
196
197 @Override
198 protected void loadCache() throws IOException {
199 Result[] values = null;
200 long remainingResultSize = maxScannerResultSize;
201 int countdown = this.caching;
202 boolean currentRegionDone = false;
203
204 while (remainingResultSize > 0 && countdown > 0
205 && nextScanner(countdown, values == null, currentRegionDone)) {
206
207
208
209
210
211 values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
212 this.currentRegion = smallScanCallable.getHRegionInfo();
213 long currentTime = System.currentTimeMillis();
214 if (this.scanMetrics != null) {
215 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
216 - lastNext);
217 }
218 lastNext = currentTime;
219 if (values != null && values.length > 0) {
220 for (int i = 0; i < values.length; i++) {
221 Result rs = values[i];
222 cache.add(rs);
223
224 for (Cell cell : rs.rawCells()) {
225 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
226 }
227 countdown--;
228 this.lastResult = rs;
229 }
230 }
231 if (smallScanCallable.hasMoreResultsContext()) {
232 currentRegionDone = !smallScanCallable.getServerHasMoreResults();
233 } else {
234 currentRegionDone = countdown > 0;
235 }
236 }
237 }
238
239 @Override
240 protected void initializeScannerInConstruction() throws IOException {
241
242
243 }
244
245 @Override
246 public void close() {
247 if (!scanMetricsPublished) writeScanMetrics();
248 closed = true;
249 }
250
251 @VisibleForTesting
252 protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
253 this.callableFactory = callableFactory;
254 }
255 }