001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 029import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 030 031import io.opentelemetry.api.trace.Span; 032import io.opentelemetry.api.trace.StatusCode; 033import io.opentelemetry.context.Scope; 034import java.io.IOException; 035import java.util.Map; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 042import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 043import org.apache.hadoop.hbase.ipc.HBaseRpcController; 044import org.apache.hadoop.hbase.trace.TraceUtil; 045import org.apache.yetus.audience.InterfaceAudience; 046 047import org.apache.hbase.thirdparty.io.netty.util.Timer; 048 049import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 054 055/** 056 * The asynchronous client scanner implementation. 057 * <p> 058 * Here we will call OpenScanner first and use the returned scannerId to create a 059 * {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of 060 * {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish 061 * scan. 062 */ 063@InterfaceAudience.Private 064class AsyncClientScanner { 065 066 // We will use this scan object during the whole scan operation. The 067 // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly. 068 private final Scan scan; 069 070 private final ScanMetrics scanMetrics; 071 072 private final AdvancedScanResultConsumer consumer; 073 074 private final TableName tableName; 075 076 private final AsyncConnectionImpl conn; 077 078 private final Timer retryTimer; 079 080 private final long pauseNs; 081 082 private final long pauseNsForServerOverloaded; 083 084 private final int maxAttempts; 085 086 private final long scanTimeoutNs; 087 088 private final long rpcTimeoutNs; 089 090 private final int startLogErrorsCnt; 091 092 private final ScanResultCache resultCache; 093 094 private final Span span; 095 096 private final Map<String, byte[]> requestAttributes; 097 098 private final boolean isScanMetricsByRegionEnabled; 099 100 public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, 101 AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded, 102 int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, 103 Map<String, byte[]> requestAttributes) { 104 if (scan.getStartRow() == null) { 105 scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); 106 } 107 if (scan.getStopRow() == null) { 108 scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow()); 109 } 110 this.scan = scan; 111 this.consumer = consumer; 112 this.tableName = tableName; 113 this.conn = conn; 114 this.retryTimer = retryTimer; 115 this.pauseNs = pauseNs; 116 this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; 117 this.maxAttempts = maxAttempts; 118 this.scanTimeoutNs = scanTimeoutNs; 119 this.rpcTimeoutNs = rpcTimeoutNs; 120 this.startLogErrorsCnt = startLogErrorsCnt; 121 this.resultCache = createScanResultCache(scan); 122 this.requestAttributes = requestAttributes; 123 boolean isScanMetricsByRegionEnabled = false; 124 if (scan.isScanMetricsEnabled()) { 125 this.scanMetrics = new ScanMetrics(); 126 consumer.onScanMetricsCreated(scanMetrics); 127 if (this.scan.isScanMetricsByRegionEnabled()) { 128 isScanMetricsByRegionEnabled = true; 129 } 130 } else { 131 this.scanMetrics = null; 132 } 133 this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled; 134 135 /* 136 * Assumes that the `start()` method is called immediately after construction. If this is no 137 * longer the case, for tracing correctness, we should move the start of the span into the 138 * `start()` method. The cost of doing so would be making access to the `span` safe for 139 * concurrent threads. 140 */ 141 span = new TableOperationSpanBuilder(conn).setTableName(tableName).setOperation(scan).build(); 142 if (consumer instanceof AsyncTableResultScanner) { 143 AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer; 144 scanner.setSpan(span); 145 } 146 } 147 148 private static final class OpenScannerResponse { 149 150 public final HRegionLocation loc; 151 152 public final boolean isRegionServerRemote; 153 154 public final ClientService.Interface stub; 155 156 public final HBaseRpcController controller; 157 158 public final ScanResponse resp; 159 160 public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub, 161 HBaseRpcController controller, ScanResponse resp) { 162 this.loc = loc; 163 this.isRegionServerRemote = isRegionServerRemote; 164 this.stub = stub; 165 this.controller = controller; 166 this.resp = resp; 167 } 168 } 169 170 private final AtomicInteger openScannerTries = new AtomicInteger(); 171 172 private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller, 173 HRegionLocation loc, ClientService.Interface stub) { 174 try (Scope ignored = span.makeCurrent()) { 175 boolean isRegionServerRemote = isRemote(loc.getHostname()); 176 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 177 if (openScannerTries.getAndIncrement() > 1) { 178 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 179 } 180 CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>(); 181 try { 182 ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), 183 scan, scan.getCaching(), false); 184 stub.scan(controller, request, resp -> { 185 try (Scope ignored1 = span.makeCurrent()) { 186 if (controller.failed()) { 187 final IOException e = controller.getFailed(); 188 future.completeExceptionally(e); 189 TraceUtil.setError(span, e); 190 span.end(); 191 return; 192 } 193 future 194 .complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp)); 195 } 196 }); 197 } catch (IOException e) { 198 // span is closed by listener attached to the Future in `openScanner()` 199 future.completeExceptionally(e); 200 } 201 return future; 202 } 203 } 204 205 private void startScan(OpenScannerResponse resp) { 206 addListener( 207 conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) 208 .remote(resp.isRegionServerRemote) 209 .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) 210 .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) 211 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 212 .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 213 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 214 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) 215 .setRequestAttributes(requestAttributes).start(resp.controller, resp.resp), 216 (hasMore, error) -> { 217 try (Scope ignored = span.makeCurrent()) { 218 if (error != null) { 219 try { 220 consumer.onError(error); 221 return; 222 } finally { 223 TraceUtil.setError(span, error); 224 span.end(); 225 } 226 } 227 if (hasMore) { 228 openScanner(); 229 } else { 230 try { 231 consumer.onComplete(); 232 } finally { 233 span.setStatus(StatusCode.OK); 234 span.end(); 235 } 236 } 237 } 238 }); 239 } 240 241 private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) { 242 try (Scope ignored = span.makeCurrent()) { 243 return conn.callerFactory.<OpenScannerResponse> single().table(tableName) 244 .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) 245 .priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 246 .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 247 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 248 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) 249 .setRequestAttributes(requestAttributes).action(this::callOpenScanner).call(); 250 } 251 } 252 253 private long getPrimaryTimeoutNs() { 254 return TableName.isMetaTableName(tableName) 255 ? conn.connConf.getPrimaryMetaScanTimeoutNs() 256 : conn.connConf.getPrimaryScanTimeoutNs(); 257 } 258 259 private void openScanner() { 260 if (this.isScanMetricsByRegionEnabled) { 261 scanMetrics.moveToNextRegion(); 262 } 263 incRegionCountMetrics(scanMetrics); 264 openScannerTries.set(1); 265 addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), 266 getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer, 267 conn.getConnectionMetrics()), (resp, error) -> { 268 try (Scope ignored = span.makeCurrent()) { 269 if (error != null) { 270 try { 271 consumer.onError(error); 272 return; 273 } finally { 274 TraceUtil.setError(span, error); 275 span.end(); 276 } 277 } 278 if (this.isScanMetricsByRegionEnabled) { 279 HRegionLocation loc = resp.loc; 280 this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(), 281 loc.getServerName()); 282 } 283 startScan(resp); 284 } 285 }); 286 } 287 288 public void start() { 289 try (Scope ignored = span.makeCurrent()) { 290 openScanner(); 291 } 292 } 293}