1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.UnknownHostException;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellScanner;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.DoNotRetryIOException;
33 import org.apache.hadoop.hbase.HBaseIOException;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.HRegionLocation;
36 import org.apache.hadoop.hbase.NotServingRegionException;
37 import org.apache.hadoop.hbase.RegionLocations;
38 import org.apache.hadoop.hbase.RemoteExceptionHandler;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.UnknownScannerException;
42 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
43 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
44 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
45 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.RequestConverter;
48 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
49 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
50 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
51 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
52 import org.apache.hadoop.ipc.RemoteException;
53 import org.apache.hadoop.net.DNS;
54
55 import com.google.protobuf.ServiceException;
56 import com.google.protobuf.TextFormat;
57
58
59
60
61
62
63 @InterfaceAudience.Private
64 public class ScannerCallable extends RegionServerCallable<Result[]> {
65 public static final String LOG_SCANNER_LATENCY_CUTOFF
66 = "hbase.client.log.scanner.latency.cutoff";
67 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
68
69 public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
70 protected long scannerId = -1L;
71 protected boolean instantiated = false;
72 protected boolean closed = false;
73 protected boolean renew = false;
74 private Scan scan;
75 private int caching = 1;
76 protected final ClusterConnection cConnection;
77 protected ScanMetrics scanMetrics;
78 private boolean logScannerActivity = false;
79 private int logCutOffLatency = 1000;
80 private static String myAddress;
81 protected final int id;
82 protected boolean serverHasMoreResultsContext;
83 protected boolean serverHasMoreResults;
84
85
86
87
88
89 protected boolean heartbeatMessage = false;
90 static {
91 try {
92 myAddress = DNS.getDefaultHost("default", "default");
93 } catch (UnknownHostException uhe) {
94 LOG.error("cannot determine my address", uhe);
95 }
96 }
97
98
99 protected boolean isRegionServerRemote = true;
100 private long nextCallSeq = 0;
101 protected RpcControllerFactory controllerFactory;
102 protected PayloadCarryingRpcController controller;
103
104
105
106
107
108
109
110
111
112
113 public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
114 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
115 this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
116 }
117
118
119
120
121
122
123
124
125 public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
126 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
127 super(connection, tableName, scan.getStartRow());
128 this.id = id;
129 this.cConnection = connection;
130 this.scan = scan;
131 this.scanMetrics = scanMetrics;
132 Configuration conf = connection.getConfiguration();
133 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
134 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
135 this.controllerFactory = rpcControllerFactory;
136 }
137
138 PayloadCarryingRpcController getController() {
139 return controller;
140 }
141
142
143
144
145
146 @Override
147 public void prepare(boolean reload) throws IOException {
148 if (Thread.interrupted()) {
149 throw new InterruptedIOException();
150 }
151 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
152 id, getConnection(), getTableName(), getRow());
153 location = id < rl.size() ? rl.getRegionLocation(id) : null;
154 if (location == null || location.getServerName() == null) {
155
156
157 throw new HBaseIOException("There is no location for replica id #" + id);
158 }
159 ServerName dest = location.getServerName();
160 setStub(super.getConnection().getClient(dest));
161 if (!instantiated || reload) {
162 checkIfRegionServerIsRemote();
163 instantiated = true;
164 }
165
166
167
168
169 if (reload && this.scanMetrics != null) {
170 this.scanMetrics.countOfRPCRetries.incrementAndGet();
171 if (isRegionServerRemote) {
172 this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
173 }
174 }
175 }
176
177
178
179
180
181 protected void checkIfRegionServerIsRemote() {
182 if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
183 isRegionServerRemote = false;
184 } else {
185 isRegionServerRemote = true;
186 }
187 }
188
189
190 @Override
191 public Result [] call(int callTimeout) throws IOException {
192 if (Thread.interrupted()) {
193 throw new InterruptedIOException();
194 }
195 if (closed) {
196 if (scannerId != -1) {
197 close();
198 }
199 } else {
200 if (scannerId == -1L) {
201 this.scannerId = openScanner();
202 } else {
203 Result [] rrs = null;
204 ScanRequest request = null;
205
206 setHeartbeatMessage(false);
207 try {
208 incRPCcallsMetrics();
209 request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, renew);
210 ScanResponse response = null;
211 controller = controllerFactory.newController();
212 controller.setPriority(getTableName());
213 controller.setCallTimeout(callTimeout);
214 try {
215 response = getStub().scan(controller, request);
216
217
218
219
220
221
222
223
224
225 nextCallSeq++;
226 long timestamp = System.currentTimeMillis();
227 setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
228
229 CellScanner cellScanner = controller.cellScanner();
230 rrs = ResponseConverter.getResults(cellScanner, response);
231 if (logScannerActivity) {
232 long now = System.currentTimeMillis();
233 if (now - timestamp > logCutOffLatency) {
234 int rows = rrs == null ? 0 : rrs.length;
235 LOG.info("Took " + (now-timestamp) + "ms to fetch "
236 + rows + " rows from scanner=" + scannerId);
237 }
238 }
239
240 if (response.hasMoreResults() && !response.getMoreResults()) {
241 scannerId = -1L;
242 closed = true;
243
244 return null;
245 }
246
247
248 if (response.hasMoreResultsInRegion()) {
249
250 setHasMoreResultsContext(true);
251 setServerHasMoreResults(response.getMoreResultsInRegion());
252 } else {
253
254 setHasMoreResultsContext(false);
255 }
256 } catch (ServiceException se) {
257 throw ProtobufUtil.getRemoteException(se);
258 }
259 updateResultsMetrics(rrs);
260 } catch (IOException e) {
261 if (logScannerActivity) {
262 LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
263 + " to " + getLocation(), e);
264 }
265 IOException ioe = e;
266 if (e instanceof RemoteException) {
267 ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
268 }
269 if (logScannerActivity) {
270 if (ioe instanceof UnknownScannerException) {
271 try {
272 HRegionLocation location =
273 getConnection().relocateRegion(getTableName(), scan.getStartRow());
274 LOG.info("Scanner=" + scannerId
275 + " expired, current region location is " + location.toString());
276 } catch (Throwable t) {
277 LOG.info("Failed to relocate region", t);
278 }
279 } else if (ioe instanceof ScannerResetException) {
280 LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
281 + "asked us to reset the scanner state.", ioe);
282 }
283 }
284
285
286
287
288
289
290 if (ioe instanceof NotServingRegionException) {
291
292
293
294 if (this.scanMetrics != null) {
295 this.scanMetrics.countOfNSRE.incrementAndGet();
296 }
297 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
298 } else if (ioe instanceof RegionServerStoppedException) {
299
300
301 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
302 } else {
303
304 throw ioe;
305 }
306 }
307 return rrs;
308 }
309 }
310 return null;
311 }
312
313
314
315
316
317
318
319 protected boolean isHeartbeatMessage() {
320 return heartbeatMessage;
321 }
322
323 protected void setHeartbeatMessage(boolean heartbeatMessage) {
324 this.heartbeatMessage = heartbeatMessage;
325 }
326
327 private void incRPCcallsMetrics() {
328 if (this.scanMetrics == null) {
329 return;
330 }
331 this.scanMetrics.countOfRPCcalls.incrementAndGet();
332 if (isRegionServerRemote) {
333 this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
334 }
335 }
336
337 protected void updateResultsMetrics(Result[] rrs) {
338 if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
339 return;
340 }
341 long resultSize = 0;
342 for (Result rr : rrs) {
343 for (Cell cell : rr.rawCells()) {
344 resultSize += CellUtil.estimatedSerializedSizeOf(cell);
345 }
346 }
347 this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
348 if (isRegionServerRemote) {
349 this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
350 }
351 }
352
353 private void close() {
354 if (this.scannerId == -1L) {
355 return;
356 }
357 try {
358 incRPCcallsMetrics();
359 ScanRequest request =
360 RequestConverter.buildScanRequest(this.scannerId, 0, true);
361 try {
362 getStub().scan(null, request);
363 } catch (ServiceException se) {
364 throw ProtobufUtil.getRemoteException(se);
365 }
366 } catch (IOException e) {
367 LOG.warn("Ignore, probably already closed", e);
368 }
369 this.scannerId = -1L;
370 }
371
372 protected long openScanner() throws IOException {
373 incRPCcallsMetrics();
374 ScanRequest request =
375 RequestConverter.buildScanRequest(
376 getLocation().getRegionInfo().getRegionName(),
377 this.scan, 0, false);
378 try {
379 ScanResponse response = getStub().scan(null, request);
380 long id = response.getScannerId();
381 if (logScannerActivity) {
382 LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
383 + " on region " + getLocation().toString());
384 }
385 return id;
386 } catch (ServiceException se) {
387 throw ProtobufUtil.getRemoteException(se);
388 }
389 }
390
391 protected Scan getScan() {
392 return scan;
393 }
394
395
396
397
398 public void setClose() {
399 this.closed = true;
400 }
401
402
403
404
405
406
407 public void setRenew(boolean val) {
408 this.renew = val;
409 }
410
411
412
413
414 @Override
415 public HRegionInfo getHRegionInfo() {
416 if (!instantiated) {
417 return null;
418 }
419 return getLocation().getRegionInfo();
420 }
421
422
423
424
425
426 public int getCaching() {
427 return caching;
428 }
429
430 @Override
431 public ClusterConnection getConnection() {
432 return cConnection;
433 }
434
435
436
437
438
439 public void setCaching(int caching) {
440 this.caching = caching;
441 }
442
443 public ScannerCallable getScannerCallableForReplica(int id) {
444 ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
445 this.getScan(), this.scanMetrics, controllerFactory, id);
446 s.setCaching(this.caching);
447 return s;
448 }
449
450
451
452
453
454 protected boolean getServerHasMoreResults() {
455 assert serverHasMoreResultsContext;
456 return this.serverHasMoreResults;
457 }
458
459 protected void setServerHasMoreResults(boolean serverHasMoreResults) {
460 this.serverHasMoreResults = serverHasMoreResults;
461 }
462
463
464
465
466
467
468 protected boolean hasMoreResultsContext() {
469 return serverHasMoreResultsContext;
470 }
471
472 protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
473 this.serverHasMoreResultsContext = serverHasMoreResultsContext;
474 }
475 }