1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import com.google.protobuf.ServiceException;
22 import com.google.protobuf.TextFormat;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.HRegionLocation;
31 import org.apache.hadoop.hbase.RemoteExceptionHandler;
32 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
33 import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
34 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
35 import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
36 import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
37 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38 import org.apache.hadoop.hbase.protobuf.RequestConverter;
39 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
40 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
41 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
42 import org.apache.hadoop.ipc.RemoteException;
43 import org.apache.hadoop.net.DNS;
44
45 import java.io.IOException;
46 import java.net.UnknownHostException;
47
48
49
50
51
52 @InterfaceAudience.Public
53 @InterfaceStability.Stable
54 public class ScannerCallable extends ServerCallable<Result[]> {
55 public static final String LOG_SCANNER_LATENCY_CUTOFF
56 = "hbase.client.log.scanner.latency.cutoff";
57 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
58
59 public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
60 private long scannerId = -1L;
61 private boolean instantiated = false;
62 private boolean closed = false;
63 private Scan scan;
64 private int caching = 1;
65 private ScanMetrics scanMetrics;
66 private boolean logScannerActivity = false;
67 private int logCutOffLatency = 1000;
68
69
70 private boolean isRegionServerRemote = true;
71 private long nextCallSeq = 0;
72
73
74
75
76
77
78
79
80 public ScannerCallable (HConnection connection, byte [] tableName, Scan scan,
81 ScanMetrics scanMetrics) {
82 super(connection, tableName, scan.getStartRow());
83 this.scan = scan;
84 this.scanMetrics = scanMetrics;
85 Configuration conf = connection.getConfiguration();
86 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
87 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
88 }
89
90
91
92
93
94 @Override
95 public void prepare(boolean reload) throws IOException {
96 if (!instantiated || reload) {
97 super.prepare(reload);
98 checkIfRegionServerIsRemote();
99 instantiated = true;
100 }
101
102
103
104
105 if (reload && this.scanMetrics != null) {
106 this.scanMetrics.countOfRPCRetries.incrementAndGet();
107 if (isRegionServerRemote) {
108 this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
109 }
110 }
111 }
112
113
114
115
116
117
118 private void checkIfRegionServerIsRemote() throws UnknownHostException {
119 String myAddress = DNS.getDefaultHost("default", "default");
120 if (this.location.getHostname().equalsIgnoreCase(myAddress)) {
121 isRegionServerRemote = false;
122 } else {
123 isRegionServerRemote = true;
124 }
125 }
126
127
128
129
130 public Result [] call() throws IOException {
131 if (closed) {
132 if (scannerId != -1) {
133 close();
134 }
135 } else {
136 if (scannerId == -1L) {
137 this.scannerId = openScanner();
138 } else {
139 Result [] rrs = null;
140 ScanRequest request = null;
141 try {
142 incRPCcallsMetrics();
143 request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
144 ScanResponse response = null;
145 try {
146 response = stub.scan(null, request);
147
148
149
150
151
152
153
154
155
156 nextCallSeq++;
157 long timestamp = System.currentTimeMillis();
158 rrs = ResponseConverter.getResults(response);
159 if (logScannerActivity) {
160 long now = System.currentTimeMillis();
161 if (now - timestamp > logCutOffLatency) {
162 int rows = rrs == null ? 0 : rrs.length;
163 LOG.info("Took " + (now-timestamp) + "ms to fetch "
164 + rows + " rows from scanner=" + scannerId);
165 }
166 }
167 if (response.hasMoreResults()
168 && !response.getMoreResults()) {
169 scannerId = -1L;
170 closed = true;
171 return null;
172 }
173 } catch (ServiceException se) {
174 throw ProtobufUtil.getRemoteException(se);
175 }
176 updateResultsMetrics(response);
177 } catch (IOException e) {
178 if (logScannerActivity) {
179 LOG.info("Got exception making request " + TextFormat.shortDebugString(request), e);
180 }
181 IOException ioe = e;
182 if (e instanceof RemoteException) {
183 ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
184 }
185 if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
186 try {
187 HRegionLocation location =
188 connection.relocateRegion(tableName, scan.getStartRow());
189 LOG.info("Scanner=" + scannerId
190 + " expired, current region location is " + location.toString()
191 + " ip:" + location.getHostnamePort());
192 } catch (Throwable t) {
193 LOG.info("Failed to relocate region", t);
194 }
195 }
196
197
198
199
200
201
202 if (ioe instanceof NotServingRegionException) {
203
204
205
206 if (this.scanMetrics != null) {
207 this.scanMetrics.countOfNSRE.incrementAndGet();
208 }
209 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
210 } else if (ioe instanceof RegionServerStoppedException) {
211
212
213 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
214 } else {
215
216 throw ioe;
217 }
218 }
219 return rrs;
220 }
221 }
222 return null;
223 }
224
225 private void incRPCcallsMetrics() {
226 if (this.scanMetrics == null) {
227 return;
228 }
229 this.scanMetrics.countOfRPCcalls.incrementAndGet();
230 if (isRegionServerRemote) {
231 this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
232 }
233 }
234
235 private void updateResultsMetrics(ScanResponse response) {
236 if (this.scanMetrics == null || !response.hasResultSizeBytes()) {
237 return;
238 }
239 long value = response.getResultSizeBytes();
240 this.scanMetrics.countOfBytesInResults.addAndGet(value);
241 if (isRegionServerRemote) {
242 this.scanMetrics.countOfBytesInRemoteResults.addAndGet(value);
243 }
244 }
245
246 private void close() {
247 if (this.scannerId == -1L) {
248 return;
249 }
250 try {
251 incRPCcallsMetrics();
252 ScanRequest request =
253 RequestConverter.buildScanRequest(this.scannerId, 0, true);
254 try {
255 stub.scan(null, request);
256 } catch (ServiceException se) {
257 throw ProtobufUtil.getRemoteException(se);
258 }
259 } catch (IOException e) {
260 LOG.warn("Ignore, probably already closed", e);
261 }
262 this.scannerId = -1L;
263 }
264
265 protected long openScanner() throws IOException {
266 incRPCcallsMetrics();
267 ScanRequest request =
268 RequestConverter.buildScanRequest(
269 this.location.getRegionInfo().getRegionName(),
270 this.scan, 0, false);
271 try {
272 ScanResponse response = stub.scan(null, request);
273 long id = response.getScannerId();
274 if (logScannerActivity) {
275 LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
276 + " on region " + this.location.toString() + " ip:"
277 + this.location.getHostnamePort());
278 }
279 return id;
280 } catch (ServiceException se) {
281 throw ProtobufUtil.getRemoteException(se);
282 }
283 }
284
285 protected Scan getScan() {
286 return scan;
287 }
288
289
290
291
292 public void setClose() {
293 this.closed = true;
294 }
295
296
297
298
299 public HRegionInfo getHRegionInfo() {
300 if (!instantiated) {
301 return null;
302 }
303 return location.getRegionInfo();
304 }
305
306
307
308
309
310 public int getCaching() {
311 return caching;
312 }
313
314
315
316
317
318 public void setCaching(int caching) {
319 this.caching = caching;
320 }
321 }