001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.client; 020 021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 026 027import java.io.IOException; 028import java.io.InterruptedIOException; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.HBaseIOException; 032import org.apache.hadoop.hbase.HRegionInfo; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.NotServingRegionException; 035import org.apache.hadoop.hbase.RegionLocations; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.UnknownScannerException; 039import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 040import org.apache.hadoop.hbase.exceptions.ScannerResetException; 041import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 042import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 049import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 052 053/** 054 * Scanner operations such as create, next, etc. 055 * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as 056 * {@link RpcRetryingCaller} so fails are retried. 057 */ 058@InterfaceAudience.Private 059public class ScannerCallable extends ClientServiceCallable<Result[]> { 060 public static final String LOG_SCANNER_LATENCY_CUTOFF 061 = "hbase.client.log.scanner.latency.cutoff"; 062 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; 063 064 // Keeping LOG public as it is being used in TestScannerHeartbeatMessages 065 public static final Logger LOG = LoggerFactory.getLogger(ScannerCallable.class); 066 protected long scannerId = -1L; 067 protected boolean instantiated = false; 068 protected boolean closed = false; 069 protected boolean renew = false; 070 protected final Scan scan; 071 private int caching = 1; 072 protected ScanMetrics scanMetrics; 073 private boolean logScannerActivity = false; 074 private int logCutOffLatency = 1000; 075 protected final int id; 076 077 enum MoreResults { 078 YES, NO, UNKNOWN 079 } 080 081 private MoreResults moreResultsInRegion; 082 private MoreResults moreResultsForScan; 083 084 /** 085 * Saves whether or not the most recent response from the server was a heartbeat message. 086 * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} 087 */ 088 protected boolean heartbeatMessage = false; 089 090 protected Cursor cursor; 091 092 // indicate if it is a remote server call 093 protected boolean isRegionServerRemote = true; 094 private long nextCallSeq = 0; 095 protected final RpcControllerFactory rpcControllerFactory; 096 097 /** 098 * @param connection which connection 099 * @param tableName table callable is on 100 * @param scan the scan to execute 101 * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect 102 * metrics 103 * @param rpcControllerFactory factory to use when creating 104 * {@link com.google.protobuf.RpcController} 105 */ 106 public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, 107 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { 108 super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority()); 109 this.id = id; 110 this.scan = scan; 111 this.scanMetrics = scanMetrics; 112 Configuration conf = connection.getConfiguration(); 113 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false); 114 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000); 115 this.rpcControllerFactory = rpcControllerFactory; 116 } 117 118 protected final HRegionLocation getLocationForReplica(RegionLocations locs) 119 throws HBaseIOException { 120 HRegionLocation loc = id < locs.size() ? locs.getRegionLocation(id) : null; 121 if (loc == null || loc.getServerName() == null) { 122 // With this exception, there will be a retry. The location can be null for a replica 123 // when the table is created or after a split. 124 throw new HBaseIOException("There is no location for replica id #" + id); 125 } 126 return loc; 127 } 128 129 protected final RegionLocations getRegionLocations(boolean reload, byte[] row) 130 throws IOException { 131 return RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id, getConnection(), 132 getTableName(), row); 133 } 134 135 /** 136 * @param reload force reload of server location 137 */ 138 @Override 139 public void prepare(boolean reload) throws IOException { 140 if (Thread.interrupted()) { 141 throw new InterruptedIOException(); 142 } 143 RegionLocations rl = getRegionLocations(reload, getRow()); 144 location = getLocationForReplica(rl); 145 ServerName dest = location.getServerName(); 146 setStub(super.getConnection().getClient(dest)); 147 if (!instantiated || reload) { 148 checkIfRegionServerIsRemote(); 149 instantiated = true; 150 } 151 cursor = null; 152 // check how often we retry. 153 if (reload) { 154 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 155 } 156 } 157 158 /** 159 * compare the local machine hostname with region server's hostname to decide if hbase client 160 * connects to a remote region server 161 */ 162 protected void checkIfRegionServerIsRemote() { 163 isRegionServerRemote = isRemote(getLocation().getHostname()); 164 } 165 166 private ScanResponse next() throws IOException { 167 // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server 168 setHeartbeatMessage(false); 169 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 170 ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, 171 this.scanMetrics != null, renew, scan.getLimit()); 172 try { 173 ScanResponse response = getStub().scan(getRpcController(), request); 174 nextCallSeq++; 175 return response; 176 } catch (Exception e) { 177 IOException ioe = ProtobufUtil.handleRemoteException(e); 178 if (logScannerActivity) { 179 LOG.info( 180 "Got exception making request " + ProtobufUtil.toText(request) + " to " + getLocation(), 181 e); 182 } 183 if (logScannerActivity) { 184 if (ioe instanceof UnknownScannerException) { 185 try { 186 HRegionLocation location = 187 getConnection().relocateRegion(getTableName(), scan.getStartRow()); 188 LOG.info("Scanner=" + scannerId + " expired, current region location is " 189 + location.toString()); 190 } catch (Throwable t) { 191 LOG.info("Failed to relocate region", t); 192 } 193 } else if (ioe instanceof ScannerResetException) { 194 LOG.info("Scanner=" + scannerId + " has received an exception, and the server " 195 + "asked us to reset the scanner state.", 196 ioe); 197 } 198 } 199 // The below convertion of exceptions into DoNotRetryExceptions is a little strange. 200 // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want 201 // ServerCallable#withRetries to just retry when it gets these exceptions. In here in 202 // a scan when doing a next in particular, we want to break out and get the scanner to 203 // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly, 204 // yeah and hard to follow and in need of a refactor). 205 if (ioe instanceof NotServingRegionException) { 206 // Throw a DNRE so that we break out of cycle of calling NSRE 207 // when what we need is to open scanner against new location. 208 // Attach NSRE to signal client that it needs to re-setup scanner. 209 if (this.scanMetrics != null) { 210 this.scanMetrics.countOfNSRE.incrementAndGet(); 211 } 212 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); 213 } else if (ioe instanceof RegionServerStoppedException) { 214 // Throw a DNRE so that we break out of cycle of the retries and instead go and 215 // open scanner against new location. 216 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); 217 } else { 218 // The outer layers will retry 219 throw ioe; 220 } 221 } 222 } 223 224 private void setAlreadyClosed() { 225 this.scannerId = -1L; 226 this.closed = true; 227 } 228 229 @Override 230 protected Result[] rpcCall() throws Exception { 231 if (Thread.interrupted()) { 232 throw new InterruptedIOException(); 233 } 234 if (closed) { 235 close(); 236 return null; 237 } 238 ScanResponse response; 239 if (this.scannerId == -1L) { 240 response = openScanner(); 241 } else { 242 response = next(); 243 } 244 long timestamp = System.currentTimeMillis(); 245 boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); 246 setHeartbeatMessage(isHeartBeat); 247 if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) { 248 cursor = ProtobufUtil.toCursor(response.getCursor()); 249 } 250 Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); 251 if (logScannerActivity) { 252 long now = System.currentTimeMillis(); 253 if (now - timestamp > logCutOffLatency) { 254 int rows = rrs == null ? 0 : rrs.length; 255 LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" 256 + scannerId); 257 } 258 } 259 updateServerSideMetrics(scanMetrics, response); 260 // moreResults is only used for the case where a filter exhausts all elements 261 if (response.hasMoreResults()) { 262 if (response.getMoreResults()) { 263 setMoreResultsForScan(MoreResults.YES); 264 } else { 265 setMoreResultsForScan(MoreResults.NO); 266 setAlreadyClosed(); 267 } 268 } else { 269 setMoreResultsForScan(MoreResults.UNKNOWN); 270 } 271 if (response.hasMoreResultsInRegion()) { 272 if (response.getMoreResultsInRegion()) { 273 setMoreResultsInRegion(MoreResults.YES); 274 } else { 275 setMoreResultsInRegion(MoreResults.NO); 276 setAlreadyClosed(); 277 } 278 } else { 279 setMoreResultsInRegion(MoreResults.UNKNOWN); 280 } 281 updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote); 282 return rrs; 283 } 284 285 /** 286 * @return true when the most recent RPC response indicated that the response was a heartbeat 287 * message. Heartbeat messages are sent back from the server when the processing of the 288 * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid 289 * timeouts during long running scan operations. 290 */ 291 boolean isHeartbeatMessage() { 292 return heartbeatMessage; 293 } 294 295 public Cursor getCursor() { 296 return cursor; 297 } 298 299 private void setHeartbeatMessage(boolean heartbeatMessage) { 300 this.heartbeatMessage = heartbeatMessage; 301 } 302 303 private void close() { 304 if (this.scannerId == -1L) { 305 return; 306 } 307 try { 308 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 309 ScanRequest request = 310 RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); 311 try { 312 getStub().scan(getRpcController(), request); 313 } catch (Exception e) { 314 throw ProtobufUtil.handleRemoteException(e); 315 } 316 } catch (IOException e) { 317 TableName table = getTableName(); 318 String tableDetails = (table == null) ? "" : (" on table: " + table.getNameAsString()); 319 LOG.warn("Ignore, probably already closed. Current scan: " + getScan().toString() 320 + tableDetails, e); 321 } 322 this.scannerId = -1L; 323 } 324 325 private ScanResponse openScanner() throws IOException { 326 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 327 ScanRequest request = RequestConverter.buildScanRequest( 328 getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false); 329 try { 330 ScanResponse response = getStub().scan(getRpcController(), request); 331 long id = response.getScannerId(); 332 if (logScannerActivity) { 333 LOG.info("Open scanner=" + id + " for scan=" + scan.toString() 334 + " on region " + getLocation().toString()); 335 } 336 if (response.hasMvccReadPoint()) { 337 this.scan.setMvccReadPoint(response.getMvccReadPoint()); 338 } 339 this.scannerId = id; 340 return response; 341 } catch (Exception e) { 342 throw ProtobufUtil.handleRemoteException(e); 343 } 344 } 345 346 protected Scan getScan() { 347 return scan; 348 } 349 350 /** 351 * Call this when the next invocation of call should close the scanner 352 */ 353 public void setClose() { 354 this.closed = true; 355 } 356 357 /** 358 * Indicate whether we make a call only to renew the lease, but without affected the scanner in 359 * any other way. 360 * @param val true if only the lease should be renewed 361 */ 362 public void setRenew(boolean val) { 363 this.renew = val; 364 } 365 366 /** 367 * @return the HRegionInfo for the current region 368 */ 369 @Override 370 public HRegionInfo getHRegionInfo() { 371 if (!instantiated) { 372 return null; 373 } 374 return getLocation().getRegionInfo(); 375 } 376 377 /** 378 * Get the number of rows that will be fetched on next 379 * @return the number of rows for caching 380 */ 381 public int getCaching() { 382 return caching; 383 } 384 385 /** 386 * Set the number of rows that will be fetched on next 387 * @param caching the number of rows for caching 388 */ 389 public void setCaching(int caching) { 390 this.caching = caching; 391 } 392 393 public ScannerCallable getScannerCallableForReplica(int id) { 394 ScannerCallable s = new ScannerCallable(this.getConnection(), getTableName(), 395 this.getScan(), this.scanMetrics, this.rpcControllerFactory, id); 396 s.setCaching(this.caching); 397 return s; 398 } 399 400 /** 401 * Should the client attempt to fetch more results from this region 402 */ 403 MoreResults moreResultsInRegion() { 404 return moreResultsInRegion; 405 } 406 407 void setMoreResultsInRegion(MoreResults moreResults) { 408 this.moreResultsInRegion = moreResults; 409 } 410 411 /** 412 * Should the client attempt to fetch more results for the whole scan. 413 */ 414 MoreResults moreResultsForScan() { 415 return moreResultsForScan; 416 } 417 418 void setMoreResultsForScan(MoreResults moreResults) { 419 this.moreResultsForScan = moreResults; 420 } 421}