1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22
23 import com.google.protobuf.ServiceException;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.CellUtil;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
33 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
34 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.RequestConverter;
37 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39 import org.apache.hadoop.hbase.util.Bytes;
40
41 import com.google.common.annotations.VisibleForTesting;
42
43 import java.io.IOException;
44 import java.io.InterruptedIOException;
45 import java.util.concurrent.ExecutorService;
46
47
48
49
50
51
52
53
54
55 @InterfaceAudience.Private
56 public class ClientSmallReversedScanner extends ReversedClientScanner {
57 private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
58 private ScannerCallableWithReplicas smallReversedScanCallable = null;
59 private SmallReversedScannerCallableFactory callableFactory;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
85 final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
86 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
87 throws IOException {
88 this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
89 primaryOperationTimeout, new SmallReversedScannerCallableFactory());
90 }
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 @VisibleForTesting
118 ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
119 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
120 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
121 SmallReversedScannerCallableFactory callableFactory) throws IOException {
122 super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
123 primaryOperationTimeout);
124 this.callableFactory = callableFactory;
125 }
126
127
128
129
130
131
132
133
134
135
136
137
138
139 private boolean nextScanner(int nbRows, final boolean done,
140 boolean currentRegionDone) throws IOException {
141
142 byte[] localStartKey;
143 int cacheNum = nbRows;
144 boolean regionChanged = true;
145 boolean isFirstRegionToLocate = false;
146
147 if (this.currentRegion != null && currentRegionDone) {
148 byte[] startKey = this.currentRegion.getStartKey();
149 if (startKey == null
150 || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
151 || checkScanStopRow(startKey) || done) {
152 close();
153 if (LOG.isDebugEnabled()) {
154 LOG.debug("Finished with small scan at " + this.currentRegion);
155 }
156 return false;
157 }
158
159 localStartKey = createClosestRowBefore(startKey);
160 if (LOG.isDebugEnabled()) {
161 LOG.debug("Finished with region " + this.currentRegion);
162 }
163 } else if (this.lastResult != null) {
164 regionChanged = false;
165 localStartKey = createClosestRowBefore(lastResult.getRow());
166 } else {
167 localStartKey = this.scan.getStartRow();
168 isFirstRegionToLocate = true;
169 }
170
171 if (!isFirstRegionToLocate
172 && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
173
174
175 return false;
176 }
177
178 if (LOG.isTraceEnabled()) {
179 LOG.trace("Advancing internal small scanner to startKey at '"
180 + Bytes.toStringBinary(localStartKey) + "'");
181 }
182
183 smallReversedScanCallable =
184 callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(),
185 localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
186 getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate);
187
188 if (this.scanMetrics != null && regionChanged) {
189 this.scanMetrics.countOfRegions.incrementAndGet();
190 }
191 return true;
192 }
193
194 @Override
195 public Result next() throws IOException {
196
197
198 if (cache.size() == 0 && this.closed) {
199 return null;
200 }
201 if (cache.size() == 0) {
202 loadCache();
203 }
204
205 if (cache.size() > 0) {
206 return cache.poll();
207 }
208
209
210 writeScanMetrics();
211 return null;
212 }
213
214 @Override
215 protected void loadCache() throws IOException {
216 Result[] values = null;
217 long remainingResultSize = maxScannerResultSize;
218 int countdown = this.caching;
219 boolean currentRegionDone = false;
220
221 while (remainingResultSize > 0 && countdown > 0
222 && nextScanner(countdown, values == null, currentRegionDone)) {
223
224
225
226
227
228 values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout);
229 this.currentRegion = smallReversedScanCallable.getHRegionInfo();
230 long currentTime = System.currentTimeMillis();
231 if (this.scanMetrics != null) {
232 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
233 - lastNext);
234 }
235 lastNext = currentTime;
236 if (values != null && values.length > 0) {
237 for (int i = 0; i < values.length; i++) {
238 Result rs = values[i];
239 cache.add(rs);
240
241 for (Cell cell : rs.rawCells()) {
242 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
243 }
244 countdown--;
245 this.lastResult = rs;
246 }
247 }
248 if (smallReversedScanCallable.hasMoreResultsContext()) {
249 currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults();
250 } else {
251 currentRegionDone = countdown > 0;
252 }
253 }
254 }
255
256 @Override
257 protected void initializeScannerInConstruction() throws IOException {
258
259
260 }
261
262 @Override
263 public void close() {
264 if (!scanMetricsPublished) writeScanMetrics();
265 closed = true;
266 }
267
268 @VisibleForTesting
269 protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) {
270 this.callableFactory = callableFactory;
271 }
272
273
274
275
276 static class SmallReversedScannerCallable extends ReversedScannerCallable {
277
278 public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan,
279 ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory,
280 int caching, int replicaId) {
281 super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId);
282 this.setCaching(caching);
283 }
284
285 @Override
286 public Result[] call(int timeout) throws IOException {
287 if (this.closed) return null;
288 if (Thread.interrupted()) {
289 throw new InterruptedIOException();
290 }
291 ClientProtos.ScanRequest request = RequestConverter.buildScanRequest(
292 getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true);
293 ClientProtos.ScanResponse response = null;
294 controller = controllerFactory.newController();
295 try {
296 controller.setPriority(getTableName());
297 controller.setCallTimeout(timeout);
298 response = getStub().scan(controller, request);
299 Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
300 if (response.hasMoreResultsInRegion()) {
301 setHasMoreResultsContext(true);
302 setServerHasMoreResults(response.getMoreResultsInRegion());
303 } else {
304 setHasMoreResultsContext(false);
305 }
306
307 updateResultsMetrics(results);
308 return results;
309 } catch (ServiceException se) {
310 throw ProtobufUtil.getRemoteException(se);
311 }
312 }
313
314 @Override
315 public ScannerCallable getScannerCallableForReplica(int id) {
316 return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(),
317 scanMetrics, locateStartRow, controllerFactory, getCaching(), id);
318 }
319 }
320
321 protected static class SmallReversedScannerCallableFactory {
322
323 public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
324 Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
325 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
326 int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
327 boolean isFirstRegionToLocate) {
328 byte[] locateStartRow = null;
329 if (isFirstRegionToLocate
330 && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
331
332
333 locateStartRow = ClientScanner.MAX_BYTE_ARRAY;
334 }
335
336 scan.setStartRow(localStartKey);
337 SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan,
338 scanMetrics, locateStartRow, controllerFactory, cacheNum, 0);
339 ScannerCallableWithReplicas scannerCallableWithReplicas =
340 new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan,
341 retries, scannerTimeout, cacheNum, conf, caller);
342 return scannerCallableWithReplicas;
343 }
344 }
345 }