1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.UnknownHostException;
24 import java.util.Map;
25 import java.util.Map.Entry;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellScanner;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.DoNotRetryIOException;
35 import org.apache.hadoop.hbase.HBaseIOException;
36 import org.apache.hadoop.hbase.HRegionInfo;
37 import org.apache.hadoop.hbase.HRegionLocation;
38 import org.apache.hadoop.hbase.NotServingRegionException;
39 import org.apache.hadoop.hbase.RegionLocations;
40 import org.apache.hadoop.hbase.RemoteExceptionHandler;
41 import org.apache.hadoop.hbase.ServerName;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.UnknownScannerException;
44 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
45 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
46 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
47 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
48 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
49 import org.apache.hadoop.hbase.protobuf.RequestConverter;
50 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
51 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
52 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
53 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
54 import org.apache.hadoop.ipc.RemoteException;
55 import org.apache.hadoop.net.DNS;
56
57 import com.google.protobuf.ServiceException;
58 import com.google.protobuf.TextFormat;
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 public class ScannerCallable extends RegionServerCallable<Result[]> {
67 public static final String LOG_SCANNER_LATENCY_CUTOFF
68 = "hbase.client.log.scanner.latency.cutoff";
69 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
70
71
72 public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
73 protected long scannerId = -1L;
74 protected boolean instantiated = false;
75 protected boolean closed = false;
76 protected boolean renew = false;
77 private Scan scan;
78 private int caching = 1;
79 protected final ClusterConnection cConnection;
80 protected ScanMetrics scanMetrics;
81 private boolean logScannerActivity = false;
82 private int logCutOffLatency = 1000;
83 private static String myAddress;
84 protected final int id;
85 protected boolean serverHasMoreResultsContext;
86 protected boolean serverHasMoreResults;
87
88
89
90
91
92 protected boolean heartbeatMessage = false;
93 static {
94 try {
95 myAddress = DNS.getDefaultHost("default", "default");
96 } catch (UnknownHostException uhe) {
97 LOG.error("cannot determine my address", uhe);
98 }
99 }
100
101
102 protected boolean isRegionServerRemote = true;
103 private long nextCallSeq = 0;
104 protected RpcControllerFactory controllerFactory;
105 protected PayloadCarryingRpcController controller;
106
107
108
109
110
111
112
113
114
115
116 public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
117 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
118 this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
119 }
120
121
122
123
124
125
126
127
128 public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
129 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
130 super(connection, tableName, scan.getStartRow());
131 this.id = id;
132 this.cConnection = connection;
133 this.scan = scan;
134 this.scanMetrics = scanMetrics;
135 Configuration conf = connection.getConfiguration();
136 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
137 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
138 this.controllerFactory = rpcControllerFactory;
139 }
140
141 PayloadCarryingRpcController getController() {
142 return controller;
143 }
144
145
146
147
148
149 @Override
150 public void prepare(boolean reload) throws IOException {
151 if (Thread.interrupted()) {
152 throw new InterruptedIOException();
153 }
154 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
155 id, getConnection(), getTableName(), getRow());
156 location = id < rl.size() ? rl.getRegionLocation(id) : null;
157 if (location == null || location.getServerName() == null) {
158
159
160 throw new HBaseIOException("There is no location for replica id #" + id);
161 }
162 ServerName dest = location.getServerName();
163 setStub(super.getConnection().getClient(dest));
164 if (!instantiated || reload) {
165 checkIfRegionServerIsRemote();
166 instantiated = true;
167 }
168
169
170
171
172 if (reload && this.scanMetrics != null) {
173 this.scanMetrics.countOfRPCRetries.incrementAndGet();
174 if (isRegionServerRemote) {
175 this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
176 }
177 }
178 }
179
180
181
182
183
184 protected void checkIfRegionServerIsRemote() {
185 if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
186 isRegionServerRemote = false;
187 } else {
188 isRegionServerRemote = true;
189 }
190 }
191
192
193 @Override
194 public Result [] call(int callTimeout) throws IOException {
195 if (Thread.interrupted()) {
196 throw new InterruptedIOException();
197 }
198 if (closed) {
199 if (scannerId != -1) {
200 close();
201 }
202 } else {
203 if (scannerId == -1L) {
204 this.scannerId = openScanner();
205 } else {
206 Result [] rrs = null;
207 ScanRequest request = null;
208
209 setHeartbeatMessage(false);
210 try {
211 incRPCcallsMetrics();
212 request =
213 RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
214 this.scanMetrics != null, renew);
215 ScanResponse response = null;
216 controller = controllerFactory.newController();
217 controller.setPriority(getTableName());
218 controller.setCallTimeout(callTimeout);
219 try {
220 response = getStub().scan(controller, request);
221
222
223
224
225
226
227
228
229
230 nextCallSeq++;
231 long timestamp = System.currentTimeMillis();
232 setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
233
234 CellScanner cellScanner = controller.cellScanner();
235 rrs = ResponseConverter.getResults(cellScanner, response);
236 if (logScannerActivity) {
237 long now = System.currentTimeMillis();
238 if (now - timestamp > logCutOffLatency) {
239 int rows = rrs == null ? 0 : rrs.length;
240 LOG.info("Took " + (now-timestamp) + "ms to fetch "
241 + rows + " rows from scanner=" + scannerId);
242 }
243 }
244 updateServerSideMetrics(response);
245
246 if (response.hasMoreResults() && !response.getMoreResults()) {
247 scannerId = -1L;
248 closed = true;
249
250 return null;
251 }
252
253
254 if (response.hasMoreResultsInRegion()) {
255
256 setHasMoreResultsContext(true);
257 setServerHasMoreResults(response.getMoreResultsInRegion());
258 } else {
259
260 setHasMoreResultsContext(false);
261 }
262 } catch (ServiceException se) {
263 throw ProtobufUtil.getRemoteException(se);
264 }
265 updateResultsMetrics(rrs);
266 } catch (IOException e) {
267 if (logScannerActivity) {
268 LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
269 + " to " + getLocation(), e);
270 }
271 IOException ioe = e;
272 if (e instanceof RemoteException) {
273 ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
274 }
275 if (logScannerActivity) {
276 if (ioe instanceof UnknownScannerException) {
277 try {
278 HRegionLocation location =
279 getConnection().relocateRegion(getTableName(), scan.getStartRow());
280 LOG.info("Scanner=" + scannerId
281 + " expired, current region location is " + location.toString());
282 } catch (Throwable t) {
283 LOG.info("Failed to relocate region", t);
284 }
285 } else if (ioe instanceof ScannerResetException) {
286 LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
287 + "asked us to reset the scanner state.", ioe);
288 }
289 }
290
291
292
293
294
295
296 if (ioe instanceof NotServingRegionException) {
297
298
299
300 if (this.scanMetrics != null) {
301 this.scanMetrics.countOfNSRE.incrementAndGet();
302 }
303 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
304 } else if (ioe instanceof RegionServerStoppedException) {
305
306
307 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
308 } else {
309
310 throw ioe;
311 }
312 }
313 return rrs;
314 }
315 }
316 return null;
317 }
318
319
320
321
322
323
324
325 protected boolean isHeartbeatMessage() {
326 return heartbeatMessage;
327 }
328
329 protected void setHeartbeatMessage(boolean heartbeatMessage) {
330 this.heartbeatMessage = heartbeatMessage;
331 }
332
333 private void incRPCcallsMetrics() {
334 if (this.scanMetrics == null) {
335 return;
336 }
337 this.scanMetrics.countOfRPCcalls.incrementAndGet();
338 if (isRegionServerRemote) {
339 this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
340 }
341 }
342
343 protected void updateResultsMetrics(Result[] rrs) {
344 if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
345 return;
346 }
347 long resultSize = 0;
348 for (Result rr : rrs) {
349 for (Cell cell : rr.rawCells()) {
350 resultSize += CellUtil.estimatedSerializedSizeOf(cell);
351 }
352 }
353 this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
354 if (isRegionServerRemote) {
355 this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
356 }
357 }
358
359
360
361
362
363
364
365 private void updateServerSideMetrics(ScanResponse response) {
366 if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
367
368 Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
369 for (Entry<String, Long> entry : serverMetrics.entrySet()) {
370 this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
371 }
372 }
373
374 private void close() {
375 if (this.scannerId == -1L) {
376 return;
377 }
378 try {
379 incRPCcallsMetrics();
380 ScanRequest request =
381 RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
382 try {
383 getStub().scan(null, request);
384 } catch (ServiceException se) {
385 throw ProtobufUtil.getRemoteException(se);
386 }
387 } catch (IOException e) {
388 LOG.warn("Ignore, probably already closed", e);
389 }
390 this.scannerId = -1L;
391 }
392
393 protected long openScanner() throws IOException {
394 incRPCcallsMetrics();
395 ScanRequest request =
396 RequestConverter.buildScanRequest(
397 getLocation().getRegionInfo().getRegionName(),
398 this.scan, 0, false);
399 try {
400 ScanResponse response = getStub().scan(null, request);
401 long id = response.getScannerId();
402 if (logScannerActivity) {
403 LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
404 + " on region " + getLocation().toString());
405 }
406 return id;
407 } catch (ServiceException se) {
408 throw ProtobufUtil.getRemoteException(se);
409 }
410 }
411
412 protected Scan getScan() {
413 return scan;
414 }
415
416
417
418
419 public void setClose() {
420 this.closed = true;
421 }
422
423
424
425
426
427
428 public void setRenew(boolean val) {
429 this.renew = val;
430 }
431
432
433
434
435 @Override
436 public HRegionInfo getHRegionInfo() {
437 if (!instantiated) {
438 return null;
439 }
440 return getLocation().getRegionInfo();
441 }
442
443
444
445
446
447 public int getCaching() {
448 return caching;
449 }
450
451 @Override
452 public ClusterConnection getConnection() {
453 return cConnection;
454 }
455
456
457
458
459
460 public void setCaching(int caching) {
461 this.caching = caching;
462 }
463
464 public ScannerCallable getScannerCallableForReplica(int id) {
465 ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
466 this.getScan(), this.scanMetrics, controllerFactory, id);
467 s.setCaching(this.caching);
468 return s;
469 }
470
471
472
473
474
475 protected boolean getServerHasMoreResults() {
476 assert serverHasMoreResultsContext;
477 return this.serverHasMoreResults;
478 }
479
480 protected void setServerHasMoreResults(boolean serverHasMoreResults) {
481 this.serverHasMoreResults = serverHasMoreResults;
482 }
483
484
485
486
487
488
489 protected boolean hasMoreResultsContext() {
490 return serverHasMoreResultsContext;
491 }
492
493 protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
494 this.serverHasMoreResultsContext = serverHasMoreResultsContext;
495 }
496 }