001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; 029import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 030import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 031 032import io.opentelemetry.api.trace.Span; 033import io.opentelemetry.api.trace.StatusCode; 034import io.opentelemetry.context.Scope; 035import java.io.IOException; 036import java.util.Map; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicInteger; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 043import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 044import org.apache.hadoop.hbase.ipc.HBaseRpcController; 045import org.apache.hadoop.hbase.trace.TraceUtil; 046import org.apache.yetus.audience.InterfaceAudience; 047 048import org.apache.hbase.thirdparty.io.netty.util.Timer; 049 050import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 055 056/** 057 * The asynchronous client scanner implementation. 058 * <p> 059 * Here we will call OpenScanner first and use the returned scannerId to create a 060 * {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of 061 * {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish 062 * scan. 063 */ 064@InterfaceAudience.Private 065class AsyncClientScanner { 066 067 // We will use this scan object during the whole scan operation. The 068 // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly. 069 private final Scan scan; 070 071 private final ScanMetrics scanMetrics; 072 073 private final AdvancedScanResultConsumer consumer; 074 075 private final TableName tableName; 076 077 private final AsyncConnectionImpl conn; 078 079 private final Timer retryTimer; 080 081 private final long pauseNs; 082 083 private final long pauseNsForServerOverloaded; 084 085 private final int maxAttempts; 086 087 private final long scanTimeoutNs; 088 089 private final long rpcTimeoutNs; 090 091 private final int startLogErrorsCnt; 092 093 private final ScanResultCache resultCache; 094 095 private final Span span; 096 097 private final Map<String, byte[]> requestAttributes; 098 099 private final boolean isScanMetricsByRegionEnabled; 100 101 public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, 102 AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded, 103 int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, 104 Map<String, byte[]> requestAttributes) { 105 if (scan.getStartRow() == null) { 106 scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); 107 } 108 if (scan.getStopRow() == null) { 109 scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow()); 110 } 111 this.scan = scan; 112 this.consumer = consumer; 113 this.tableName = tableName; 114 this.conn = conn; 115 this.retryTimer = retryTimer; 116 this.pauseNs = pauseNs; 117 this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; 118 this.maxAttempts = maxAttempts; 119 this.scanTimeoutNs = scanTimeoutNs; 120 this.rpcTimeoutNs = rpcTimeoutNs; 121 this.startLogErrorsCnt = startLogErrorsCnt; 122 this.resultCache = createScanResultCache(scan); 123 this.requestAttributes = requestAttributes; 124 boolean isScanMetricsByRegionEnabled = false; 125 if (scan.isScanMetricsEnabled()) { 126 this.scanMetrics = new ScanMetrics(); 127 consumer.onScanMetricsCreated(scanMetrics); 128 if (this.scan.isScanMetricsByRegionEnabled()) { 129 isScanMetricsByRegionEnabled = true; 130 } 131 } else { 132 this.scanMetrics = null; 133 } 134 this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled; 135 136 /* 137 * Assumes that the `start()` method is called immediately after construction. If this is no 138 * longer the case, for tracing correctness, we should move the start of the span into the 139 * `start()` method. The cost of doing so would be making access to the `span` safe for 140 * concurrent threads. 141 */ 142 span = new TableOperationSpanBuilder(conn).setTableName(tableName).setOperation(scan).build(); 143 if (consumer instanceof AsyncTableResultScanner) { 144 AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer; 145 scanner.setSpan(span); 146 } 147 } 148 149 private static final class OpenScannerResponse { 150 151 public final HRegionLocation loc; 152 153 public final boolean isRegionServerRemote; 154 155 public final ClientService.Interface stub; 156 157 public final HBaseRpcController controller; 158 159 public final ScanResponse resp; 160 161 public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub, 162 HBaseRpcController controller, ScanResponse resp) { 163 this.loc = loc; 164 this.isRegionServerRemote = isRegionServerRemote; 165 this.stub = stub; 166 this.controller = controller; 167 this.resp = resp; 168 } 169 } 170 171 private final AtomicInteger openScannerTries = new AtomicInteger(); 172 173 private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller, 174 HRegionLocation loc, ClientService.Interface stub) { 175 try (Scope ignored = span.makeCurrent()) { 176 boolean isRegionServerRemote = isRemote(loc.getHostname()); 177 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 178 if (openScannerTries.getAndIncrement() > 1) { 179 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 180 } 181 CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>(); 182 try { 183 ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), 184 scan, scan.getCaching(), false); 185 stub.scan(controller, request, resp -> { 186 try (Scope ignored1 = span.makeCurrent()) { 187 if (controller.failed()) { 188 final IOException e = controller.getFailed(); 189 future.completeExceptionally(e); 190 TraceUtil.setError(span, e); 191 span.end(); 192 return; 193 } 194 future 195 .complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp)); 196 } 197 }); 198 } catch (IOException e) { 199 // span is closed by listener attached to the Future in `openScanner()` 200 future.completeExceptionally(e); 201 } 202 return future; 203 } 204 } 205 206 // lastNextCallNanos is used to calculate the MILLIS_BETWEEN_NEXTS scan metrics 207 private void startScan(OpenScannerResponse resp, long lastNextCallNanos) { 208 AsyncScanSingleRegionRpcRetryingCaller scanSingleRegionCaller = 209 conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) 210 .remote(resp.isRegionServerRemote) 211 .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) 212 .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) 213 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 214 .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 215 .lastNextCallNanos(lastNextCallNanos) 216 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 217 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) 218 .setRequestAttributes(requestAttributes).build(); 219 addListener(scanSingleRegionCaller.start(resp.controller, resp.resp), (hasMore, error) -> { 220 try (Scope ignored = span.makeCurrent()) { 221 if (error != null) { 222 try { 223 consumer.onError(error); 224 return; 225 } finally { 226 TraceUtil.setError(span, error); 227 span.end(); 228 } 229 } 230 if (hasMore) { 231 openScanner(scanSingleRegionCaller); 232 } else { 233 try { 234 consumer.onComplete(); 235 } finally { 236 span.setStatus(StatusCode.OK); 237 span.end(); 238 } 239 } 240 } 241 }); 242 } 243 244 private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) { 245 try (Scope ignored = span.makeCurrent()) { 246 return conn.callerFactory.<OpenScannerResponse> single().table(tableName) 247 .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) 248 .priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 249 .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 250 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 251 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) 252 .setRequestAttributes(requestAttributes).action(this::callOpenScanner).call(); 253 } 254 } 255 256 private long getPrimaryTimeoutNs() { 257 return TableName.isMetaTableName(tableName) 258 ? conn.connConf.getPrimaryMetaScanTimeoutNs() 259 : conn.connConf.getPrimaryScanTimeoutNs(); 260 } 261 262 private void openScanner(AsyncScanSingleRegionRpcRetryingCaller previousScanSingleRegionCaller) { 263 if (this.isScanMetricsByRegionEnabled) { 264 scanMetrics.moveToNextRegion(); 265 } 266 incRegionCountMetrics(scanMetrics); 267 long openScannerStartNanos = System.nanoTime(); 268 if (previousScanSingleRegionCaller != null) { 269 // open scanner is also a next call 270 incMillisBetweenNextsMetrics(scanMetrics, TimeUnit.NANOSECONDS 271 .toMillis(openScannerStartNanos - previousScanSingleRegionCaller.getLastNextCallNanos())); 272 } 273 openScannerTries.set(1); 274 addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), 275 getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer, 276 conn.getConnectionMetrics()), (resp, error) -> { 277 try (Scope ignored = span.makeCurrent()) { 278 if (error != null) { 279 try { 280 consumer.onError(error); 281 return; 282 } finally { 283 TraceUtil.setError(span, error); 284 span.end(); 285 } 286 } 287 if (this.isScanMetricsByRegionEnabled) { 288 HRegionLocation loc = resp.loc; 289 this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(), 290 loc.getServerName()); 291 } 292 startScan(resp, openScannerStartNanos); 293 } 294 }); 295 } 296 297 public void start() { 298 try (Scope ignored = span.makeCurrent()) { 299 openScanner(null); 300 } 301 } 302}