001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.client; 020 021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 026 027import java.io.IOException; 028import java.io.InterruptedIOException; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.HBaseIOException; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.NotServingRegionException; 036import org.apache.hadoop.hbase.RegionLocations; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.UnknownScannerException; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 044import org.apache.hadoop.hbase.exceptions.ScannerResetException; 045import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 046import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 049import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 052 053/** 054 * Scanner operations such as create, next, etc. 055 * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as 056 * {@link RpcRetryingCaller} so fails are retried. 057 */ 058@InterfaceAudience.Private 059public class ScannerCallable extends ClientServiceCallable<Result[]> { 060 public static final String LOG_SCANNER_LATENCY_CUTOFF 061 = "hbase.client.log.scanner.latency.cutoff"; 062 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; 063 064 // Keeping LOG public as it is being used in TestScannerHeartbeatMessages 065 public static final Logger LOG = LoggerFactory.getLogger(ScannerCallable.class); 066 protected long scannerId = -1L; 067 protected boolean instantiated = false; 068 protected boolean closed = false; 069 protected boolean renew = false; 070 protected final Scan scan; 071 private int caching = 1; 072 protected ScanMetrics scanMetrics; 073 private boolean logScannerActivity = false; 074 private int logCutOffLatency = 1000; 075 protected final int id; 076 077 enum MoreResults { 078 YES, NO, UNKNOWN 079 } 080 081 private MoreResults moreResultsInRegion; 082 private MoreResults moreResultsForScan; 083 084 /** 085 * Saves whether or not the most recent response from the server was a heartbeat message. 086 * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} 087 */ 088 protected boolean heartbeatMessage = false; 089 090 protected Cursor cursor; 091 092 // indicate if it is a remote server call 093 protected boolean isRegionServerRemote = true; 094 private long nextCallSeq = 0; 095 protected final RpcControllerFactory rpcControllerFactory; 096 097 /** 098 * @param connection which connection 099 * @param tableName table callable is on 100 * @param scan the scan to execute 101 * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect 102 * metrics 103 * @param rpcControllerFactory factory to use when creating 104 * {@link com.google.protobuf.RpcController} 105 */ 106 public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, 107 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) { 108 this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0); 109 } 110 /** 111 * 112 * @param connection 113 * @param tableName 114 * @param scan 115 * @param scanMetrics 116 * @param id the replicaId 117 */ 118 public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, 119 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { 120 super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority()); 121 this.id = id; 122 this.scan = scan; 123 this.scanMetrics = scanMetrics; 124 Configuration conf = connection.getConfiguration(); 125 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false); 126 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000); 127 this.rpcControllerFactory = rpcControllerFactory; 128 } 129 130 /** 131 * @param reload force reload of server location 132 * @throws IOException 133 */ 134 @Override 135 public void prepare(boolean reload) throws IOException { 136 if (Thread.interrupted()) { 137 throw new InterruptedIOException(); 138 } 139 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, 140 id, getConnection(), getTableName(), getRow()); 141 location = id < rl.size() ? rl.getRegionLocation(id) : null; 142 if (location == null || location.getServerName() == null) { 143 // With this exception, there will be a retry. The location can be null for a replica 144 // when the table is created or after a split. 145 throw new HBaseIOException("There is no location for replica id #" + id); 146 } 147 ServerName dest = location.getServerName(); 148 setStub(super.getConnection().getClient(dest)); 149 if (!instantiated || reload) { 150 checkIfRegionServerIsRemote(); 151 instantiated = true; 152 } 153 cursor = null; 154 // check how often we retry. 155 if (reload) { 156 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 157 } 158 } 159 160 /** 161 * compare the local machine hostname with region server's hostname to decide if hbase client 162 * connects to a remote region server 163 */ 164 protected void checkIfRegionServerIsRemote() { 165 isRegionServerRemote = isRemote(getLocation().getHostname()); 166 } 167 168 private ScanResponse next() throws IOException { 169 // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server 170 setHeartbeatMessage(false); 171 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 172 ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, 173 this.scanMetrics != null, renew, scan.getLimit()); 174 try { 175 ScanResponse response = getStub().scan(getRpcController(), request); 176 nextCallSeq++; 177 return response; 178 } catch (Exception e) { 179 IOException ioe = ProtobufUtil.handleRemoteException(e); 180 if (logScannerActivity) { 181 LOG.info( 182 "Got exception making request " + ProtobufUtil.toText(request) + " to " + getLocation(), 183 e); 184 } 185 if (logScannerActivity) { 186 if (ioe instanceof UnknownScannerException) { 187 try { 188 HRegionLocation location = 189 getConnection().relocateRegion(getTableName(), scan.getStartRow()); 190 LOG.info("Scanner=" + scannerId + " expired, current region location is " 191 + location.toString()); 192 } catch (Throwable t) { 193 LOG.info("Failed to relocate region", t); 194 } 195 } else if (ioe instanceof ScannerResetException) { 196 LOG.info("Scanner=" + scannerId + " has received an exception, and the server " 197 + "asked us to reset the scanner state.", 198 ioe); 199 } 200 } 201 // The below convertion of exceptions into DoNotRetryExceptions is a little strange. 202 // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want 203 // ServerCallable#withRetries to just retry when it gets these exceptions. In here in 204 // a scan when doing a next in particular, we want to break out and get the scanner to 205 // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly, 206 // yeah and hard to follow and in need of a refactor). 207 if (ioe instanceof NotServingRegionException) { 208 // Throw a DNRE so that we break out of cycle of calling NSRE 209 // when what we need is to open scanner against new location. 210 // Attach NSRE to signal client that it needs to re-setup scanner. 211 if (this.scanMetrics != null) { 212 this.scanMetrics.countOfNSRE.incrementAndGet(); 213 } 214 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); 215 } else if (ioe instanceof RegionServerStoppedException) { 216 // Throw a DNRE so that we break out of cycle of the retries and instead go and 217 // open scanner against new location. 218 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); 219 } else { 220 // The outer layers will retry 221 throw ioe; 222 } 223 } 224 } 225 226 private void setAlreadyClosed() { 227 this.scannerId = -1L; 228 this.closed = true; 229 } 230 231 @Override 232 protected Result[] rpcCall() throws Exception { 233 if (Thread.interrupted()) { 234 throw new InterruptedIOException(); 235 } 236 if (closed) { 237 close(); 238 return null; 239 } 240 ScanResponse response; 241 if (this.scannerId == -1L) { 242 response = openScanner(); 243 } else { 244 response = next(); 245 } 246 long timestamp = System.currentTimeMillis(); 247 boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); 248 setHeartbeatMessage(isHeartBeat); 249 if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) { 250 cursor = ProtobufUtil.toCursor(response.getCursor()); 251 } 252 Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); 253 if (logScannerActivity) { 254 long now = System.currentTimeMillis(); 255 if (now - timestamp > logCutOffLatency) { 256 int rows = rrs == null ? 0 : rrs.length; 257 LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" 258 + scannerId); 259 } 260 } 261 updateServerSideMetrics(scanMetrics, response); 262 // moreResults is only used for the case where a filter exhausts all elements 263 if (response.hasMoreResults()) { 264 if (response.getMoreResults()) { 265 setMoreResultsForScan(MoreResults.YES); 266 } else { 267 setMoreResultsForScan(MoreResults.NO); 268 setAlreadyClosed(); 269 } 270 } else { 271 setMoreResultsForScan(MoreResults.UNKNOWN); 272 } 273 if (response.hasMoreResultsInRegion()) { 274 if (response.getMoreResultsInRegion()) { 275 setMoreResultsInRegion(MoreResults.YES); 276 } else { 277 setMoreResultsInRegion(MoreResults.NO); 278 setAlreadyClosed(); 279 } 280 } else { 281 setMoreResultsInRegion(MoreResults.UNKNOWN); 282 } 283 updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote); 284 return rrs; 285 } 286 287 /** 288 * @return true when the most recent RPC response indicated that the response was a heartbeat 289 * message. Heartbeat messages are sent back from the server when the processing of the 290 * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid 291 * timeouts during long running scan operations. 292 */ 293 boolean isHeartbeatMessage() { 294 return heartbeatMessage; 295 } 296 297 public Cursor getCursor() { 298 return cursor; 299 } 300 301 private void setHeartbeatMessage(boolean heartbeatMessage) { 302 this.heartbeatMessage = heartbeatMessage; 303 } 304 305 private void close() { 306 if (this.scannerId == -1L) { 307 return; 308 } 309 try { 310 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 311 ScanRequest request = 312 RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); 313 try { 314 getStub().scan(getRpcController(), request); 315 } catch (Exception e) { 316 throw ProtobufUtil.handleRemoteException(e); 317 } 318 } catch (IOException e) { 319 TableName table = getTableName(); 320 String tableDetails = (table == null) ? "" : (" on table: " + table.getNameAsString()); 321 LOG.warn("Ignore, probably already closed. Current scan: " + getScan().toString() 322 + tableDetails, e); 323 } 324 this.scannerId = -1L; 325 } 326 327 private ScanResponse openScanner() throws IOException { 328 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 329 ScanRequest request = RequestConverter.buildScanRequest( 330 getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false); 331 try { 332 ScanResponse response = getStub().scan(getRpcController(), request); 333 long id = response.getScannerId(); 334 if (logScannerActivity) { 335 LOG.info("Open scanner=" + id + " for scan=" + scan.toString() 336 + " on region " + getLocation().toString()); 337 } 338 if (response.hasMvccReadPoint()) { 339 this.scan.setMvccReadPoint(response.getMvccReadPoint()); 340 } 341 this.scannerId = id; 342 return response; 343 } catch (Exception e) { 344 throw ProtobufUtil.handleRemoteException(e); 345 } 346 } 347 348 protected Scan getScan() { 349 return scan; 350 } 351 352 /** 353 * Call this when the next invocation of call should close the scanner 354 */ 355 public void setClose() { 356 this.closed = true; 357 } 358 359 /** 360 * Indicate whether we make a call only to renew the lease, but without affected the scanner in 361 * any other way. 362 * @param val true if only the lease should be renewed 363 */ 364 public void setRenew(boolean val) { 365 this.renew = val; 366 } 367 368 /** 369 * @return the HRegionInfo for the current region 370 */ 371 @Override 372 public HRegionInfo getHRegionInfo() { 373 if (!instantiated) { 374 return null; 375 } 376 return getLocation().getRegionInfo(); 377 } 378 379 /** 380 * Get the number of rows that will be fetched on next 381 * @return the number of rows for caching 382 */ 383 public int getCaching() { 384 return caching; 385 } 386 387 /** 388 * Set the number of rows that will be fetched on next 389 * @param caching the number of rows for caching 390 */ 391 public void setCaching(int caching) { 392 this.caching = caching; 393 } 394 395 public ScannerCallable getScannerCallableForReplica(int id) { 396 ScannerCallable s = new ScannerCallable(this.getConnection(), getTableName(), 397 this.getScan(), this.scanMetrics, this.rpcControllerFactory, id); 398 s.setCaching(this.caching); 399 return s; 400 } 401 402 /** 403 * Should the client attempt to fetch more results from this region 404 */ 405 MoreResults moreResultsInRegion() { 406 return moreResultsInRegion; 407 } 408 409 void setMoreResultsInRegion(MoreResults moreResults) { 410 this.moreResultsInRegion = moreResults; 411 } 412 413 /** 414 * Should the client attempt to fetch more results for the whole scan. 415 */ 416 MoreResults moreResultsForScan() { 417 return moreResultsForScan; 418 } 419 420 void setMoreResultsForScan(MoreResults moreResults) { 421 this.moreResultsForScan = moreResults; 422 } 423}