001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 025 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.DoNotRetryIOException; 030import org.apache.hadoop.hbase.HBaseIOException; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HRegionInfo; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.NotServingRegionException; 035import org.apache.hadoop.hbase.RegionLocations; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.TableNotEnabledException; 039import org.apache.hadoop.hbase.UnknownScannerException; 040import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 041import org.apache.hadoop.hbase.exceptions.ScannerResetException; 042import org.apache.hadoop.hbase.ipc.HBaseRpcController; 043import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 044import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 051import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 052import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 055 056/** 057 * Scanner operations such as create, next, etc. Used by {@link ResultScanner}s made by 058 * {@link Table}. Passed to a retrying caller such as {@link RpcRetryingCaller} so fails are 059 * retried. 060 */ 061@InterfaceAudience.Private 062public class ScannerCallable extends ClientServiceCallable<Result[]> { 063 public static final String LOG_SCANNER_LATENCY_CUTOFF = "hbase.client.log.scanner.latency.cutoff"; 064 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; 065 066 // Keeping LOG public as it is being used in TestScannerHeartbeatMessages 067 public static final Logger LOG = LoggerFactory.getLogger(ScannerCallable.class); 068 protected long scannerId = -1L; 069 protected boolean instantiated = false; 070 protected boolean closed = false; 071 protected boolean renew = false; 072 protected final Scan scan; 073 private int caching = 1; 074 protected ScanMetrics scanMetrics; 075 private boolean logScannerActivity = false; 076 private int logCutOffLatency = 1000; 077 protected final int id; 078 079 enum MoreResults { 080 YES, 081 NO, 082 UNKNOWN 083 } 084 085 private MoreResults moreResultsInRegion; 086 private MoreResults moreResultsForScan; 087 088 /** 089 * Saves whether or not the most recent response from the server was a heartbeat message. 090 * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} 091 */ 092 protected boolean heartbeatMessage = false; 093 094 protected Cursor cursor; 095 096 // indicate if it is a remote server call 097 protected boolean isRegionServerRemote = true; 098 private long nextCallSeq = 0; 099 protected final RpcControllerFactory rpcControllerFactory; 100 101 /** 102 * @param connection which connection 103 * @param tableName table callable is on 104 * @param scan the scan to execute 105 * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't 106 * collect metrics 107 * @param rpcControllerFactory factory to use when creating 108 * {@link com.google.protobuf.RpcController} 109 */ 110 public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, 111 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { 112 super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), 113 scan.getPriority()); 114 this.id = id; 115 this.scan = scan; 116 this.scanMetrics = scanMetrics; 117 Configuration conf = connection.getConfiguration(); 118 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false); 119 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000); 120 this.rpcControllerFactory = rpcControllerFactory; 121 } 122 123 protected final HRegionLocation getLocationForReplica(RegionLocations locs) 124 throws HBaseIOException { 125 HRegionLocation loc = id < locs.size() ? locs.getRegionLocation(id) : null; 126 if (loc == null || loc.getServerName() == null) { 127 // With this exception, there will be a retry. The location can be null for a replica 128 // when the table is created or after a split. 129 throw new HBaseIOException("There is no location for replica id #" + id); 130 } 131 return loc; 132 } 133 134 /** 135 * Fetch region locations for the row. Since this is for prepare, we always useCache. This is 136 * because we can be sure that RpcRetryingCaller will have cleared the cache in error handling if 137 * this is a retry. 138 * @param row the row to look up region location for 139 */ 140 protected final RegionLocations getRegionLocationsForPrepare(byte[] row) throws IOException { 141 // always use cache, because cache will have been cleared if necessary 142 // in the try/catch before retrying 143 return RpcRetryingCallerWithReadReplicas.getRegionLocations(true, id, getConnection(), 144 getTableName(), row); 145 } 146 147 /** 148 * @param reload force reload of server location 149 */ 150 @Override 151 public void prepare(boolean reload) throws IOException { 152 if (Thread.interrupted()) { 153 throw new InterruptedIOException(); 154 } 155 156 if ( 157 reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) 158 && getConnection().isTableDisabled(getTableName()) 159 ) { 160 throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled."); 161 } 162 163 RegionLocations rl = getRegionLocationsForPrepare(getRow()); 164 location = getLocationForReplica(rl); 165 ServerName dest = location.getServerName(); 166 setStub(super.getConnection().getClient(dest)); 167 if (!instantiated || reload) { 168 checkIfRegionServerIsRemote(); 169 instantiated = true; 170 } 171 cursor = null; 172 // check how often we retry. 173 if (reload) { 174 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 175 } 176 } 177 178 /** 179 * compare the local machine hostname with region server's hostname to decide if hbase client 180 * connects to a remote region server 181 */ 182 protected void checkIfRegionServerIsRemote() { 183 isRegionServerRemote = isRemote(getLocation().getHostname()); 184 } 185 186 private ScanResponse next() throws IOException { 187 // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server 188 setHeartbeatMessage(false); 189 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 190 ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, 191 this.scanMetrics != null, renew, scan.getLimit()); 192 try { 193 ScanResponse response = getStub().scan(getRpcController(), request); 194 nextCallSeq++; 195 return response; 196 } catch (Exception e) { 197 IOException ioe = ProtobufUtil.handleRemoteException(e); 198 if (logScannerActivity) { 199 LOG.info( 200 "Got exception making request " + ProtobufUtil.toText(request) + " to " + getLocation(), 201 e); 202 } 203 if (logScannerActivity) { 204 if (ioe instanceof UnknownScannerException) { 205 try { 206 HRegionLocation location = 207 getConnection().relocateRegion(getTableName(), scan.getStartRow()); 208 LOG.info("Scanner=" + scannerId + " expired, current region location is " 209 + location.toString()); 210 } catch (Throwable t) { 211 LOG.info("Failed to relocate region", t); 212 } 213 } else if (ioe instanceof ScannerResetException) { 214 LOG.info("Scanner=" + scannerId + " has received an exception, and the server " 215 + "asked us to reset the scanner state.", ioe); 216 } 217 } 218 // The below convertion of exceptions into DoNotRetryExceptions is a little strange. 219 // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want 220 // ServerCallable#withRetries to just retry when it gets these exceptions. In here in 221 // a scan when doing a next in particular, we want to break out and get the scanner to 222 // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly, 223 // yeah and hard to follow and in need of a refactor). 224 if (ioe instanceof NotServingRegionException) { 225 // Throw a DNRE so that we break out of cycle of calling NSRE 226 // when what we need is to open scanner against new location. 227 // Attach NSRE to signal client that it needs to re-setup scanner. 228 if (this.scanMetrics != null) { 229 this.scanMetrics.countOfNSRE.incrementAndGet(); 230 } 231 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); 232 } else if (ioe instanceof RegionServerStoppedException) { 233 // Throw a DNRE so that we break out of cycle of the retries and instead go and 234 // open scanner against new location. 235 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); 236 } else { 237 // The outer layers will retry 238 throw ioe; 239 } 240 } 241 } 242 243 private void setAlreadyClosed() { 244 this.scannerId = -1L; 245 this.closed = true; 246 } 247 248 @Override 249 protected Result[] rpcCall() throws Exception { 250 if (Thread.interrupted()) { 251 throw new InterruptedIOException(); 252 } 253 if (closed) { 254 close(); 255 return null; 256 } 257 ScanResponse response; 258 if (this.scannerId == -1L) { 259 response = openScanner(); 260 } else { 261 response = next(); 262 } 263 long timestamp = EnvironmentEdgeManager.currentTime(); 264 boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); 265 setHeartbeatMessage(isHeartBeat); 266 if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) { 267 cursor = ProtobufUtil.toCursor(response.getCursor()); 268 } 269 Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); 270 if (logScannerActivity) { 271 long now = EnvironmentEdgeManager.currentTime(); 272 if (now - timestamp > logCutOffLatency) { 273 int rows = rrs == null ? 0 : rrs.length; 274 LOG.info( 275 "Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); 276 } 277 } 278 updateServerSideMetrics(scanMetrics, response); 279 // moreResults is only used for the case where a filter exhausts all elements 280 if (response.hasMoreResults()) { 281 if (response.getMoreResults()) { 282 setMoreResultsForScan(MoreResults.YES); 283 } else { 284 setMoreResultsForScan(MoreResults.NO); 285 setAlreadyClosed(); 286 } 287 } else { 288 setMoreResultsForScan(MoreResults.UNKNOWN); 289 } 290 if (response.hasMoreResultsInRegion()) { 291 if (response.getMoreResultsInRegion()) { 292 setMoreResultsInRegion(MoreResults.YES); 293 } else { 294 setMoreResultsInRegion(MoreResults.NO); 295 setAlreadyClosed(); 296 } 297 } else { 298 setMoreResultsInRegion(MoreResults.UNKNOWN); 299 } 300 updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote); 301 return rrs; 302 } 303 304 /** 305 * @return true when the most recent RPC response indicated that the response was a heartbeat 306 * message. Heartbeat messages are sent back from the server when the processing of the 307 * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid 308 * timeouts during long running scan operations. 309 */ 310 boolean isHeartbeatMessage() { 311 return heartbeatMessage; 312 } 313 314 public Cursor getCursor() { 315 return cursor; 316 } 317 318 private void setHeartbeatMessage(boolean heartbeatMessage) { 319 this.heartbeatMessage = heartbeatMessage; 320 } 321 322 private void close() { 323 if (this.scannerId == -1L) { 324 return; 325 } 326 try { 327 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 328 ScanRequest request = 329 RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); 330 HBaseRpcController controller = rpcControllerFactory.newController(); 331 332 // Set fields from the original controller onto the close-specific controller 333 // We set the timeout and the priority -- we will overwrite the priority to HIGH 334 // below, but the controller will take whichever is highest. 335 if (getRpcController() instanceof HBaseRpcController) { 336 HBaseRpcController original = (HBaseRpcController) getRpcController(); 337 controller.setPriority(original.getPriority()); 338 if (original.hasCallTimeout()) { 339 controller.setCallTimeout(original.getCallTimeout()); 340 } 341 } 342 controller.setPriority(HConstants.HIGH_QOS); 343 344 try { 345 getStub().scan(controller, request); 346 } catch (Exception e) { 347 throw ProtobufUtil.handleRemoteException(e); 348 } 349 } catch (IOException e) { 350 TableName table = getTableName(); 351 String tableDetails = (table == null) ? "" : (" on table: " + table.getNameAsString()); 352 LOG.warn( 353 "Ignore, probably already closed. Current scan: " + getScan().toString() + tableDetails, e); 354 } 355 this.scannerId = -1L; 356 } 357 358 private ScanResponse openScanner() throws IOException { 359 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 360 ScanRequest request = RequestConverter.buildScanRequest( 361 getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false); 362 try { 363 ScanResponse response = getStub().scan(getRpcController(), request); 364 long id = response.getScannerId(); 365 if (logScannerActivity) { 366 LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + " on region " 367 + getLocation().toString()); 368 } 369 if (response.hasMvccReadPoint()) { 370 this.scan.setMvccReadPoint(response.getMvccReadPoint()); 371 } 372 this.scannerId = id; 373 return response; 374 } catch (Exception e) { 375 throw ProtobufUtil.handleRemoteException(e); 376 } 377 } 378 379 protected Scan getScan() { 380 return scan; 381 } 382 383 /** 384 * Call this when the next invocation of call should close the scanner 385 */ 386 public void setClose() { 387 this.closed = true; 388 } 389 390 /** 391 * Indicate whether we make a call only to renew the lease, but without affected the scanner in 392 * any other way. 393 * @param val true if only the lease should be renewed 394 */ 395 public void setRenew(boolean val) { 396 this.renew = val; 397 } 398 399 /** Returns the HRegionInfo for the current region */ 400 @Override 401 public HRegionInfo getHRegionInfo() { 402 if (!instantiated) { 403 return null; 404 } 405 return getLocation().getRegionInfo(); 406 } 407 408 /** 409 * Get the number of rows that will be fetched on next 410 * @return the number of rows for caching 411 */ 412 public int getCaching() { 413 return caching; 414 } 415 416 /** 417 * Set the number of rows that will be fetched on next 418 * @param caching the number of rows for caching 419 */ 420 public void setCaching(int caching) { 421 this.caching = caching; 422 } 423 424 public ScannerCallable getScannerCallableForReplica(int id) { 425 ScannerCallable s = new ScannerCallable(this.getConnection(), getTableName(), this.getScan(), 426 this.scanMetrics, this.rpcControllerFactory, id); 427 s.setCaching(this.caching); 428 return s; 429 } 430 431 /** 432 * Should the client attempt to fetch more results from this region 433 */ 434 MoreResults moreResultsInRegion() { 435 return moreResultsInRegion; 436 } 437 438 void setMoreResultsInRegion(MoreResults moreResults) { 439 this.moreResultsInRegion = moreResults; 440 } 441 442 /** 443 * Should the client attempt to fetch more results for the whole scan. 444 */ 445 MoreResults moreResultsForScan() { 446 return moreResultsForScan; 447 } 448 449 void setMoreResultsForScan(MoreResults moreResults) { 450 this.moreResultsForScan = moreResults; 451 } 452}