001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 029import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 030 031import io.opentelemetry.api.trace.Span; 032import io.opentelemetry.api.trace.StatusCode; 033import io.opentelemetry.context.Scope; 034import java.io.IOException; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicInteger; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 041import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 042import org.apache.hadoop.hbase.ipc.HBaseRpcController; 043import org.apache.hadoop.hbase.trace.TraceUtil; 044import org.apache.yetus.audience.InterfaceAudience; 045 046import org.apache.hbase.thirdparty.io.netty.util.Timer; 047 048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 053 054/** 055 * The asynchronous client scanner implementation. 056 * <p> 057 * Here we will call OpenScanner first and use the returned scannerId to create a 058 * {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of 059 * {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish 060 * scan. 061 */ 062@InterfaceAudience.Private 063class AsyncClientScanner { 064 065 // We will use this scan object during the whole scan operation. The 066 // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly. 067 private final Scan scan; 068 069 private final ScanMetrics scanMetrics; 070 071 private final AdvancedScanResultConsumer consumer; 072 073 private final TableName tableName; 074 075 private final AsyncConnectionImpl conn; 076 077 private final Timer retryTimer; 078 079 private final long pauseNs; 080 081 private final long pauseNsForServerOverloaded; 082 083 private final int maxAttempts; 084 085 private final long scanTimeoutNs; 086 087 private final long rpcTimeoutNs; 088 089 private final int startLogErrorsCnt; 090 091 private final ScanResultCache resultCache; 092 093 private final Span span; 094 095 public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, 096 AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded, 097 int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { 098 if (scan.getStartRow() == null) { 099 scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); 100 } 101 if (scan.getStopRow() == null) { 102 scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow()); 103 } 104 this.scan = scan; 105 this.consumer = consumer; 106 this.tableName = tableName; 107 this.conn = conn; 108 this.retryTimer = retryTimer; 109 this.pauseNs = pauseNs; 110 this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; 111 this.maxAttempts = maxAttempts; 112 this.scanTimeoutNs = scanTimeoutNs; 113 this.rpcTimeoutNs = rpcTimeoutNs; 114 this.startLogErrorsCnt = startLogErrorsCnt; 115 this.resultCache = createScanResultCache(scan); 116 if (scan.isScanMetricsEnabled()) { 117 this.scanMetrics = new ScanMetrics(); 118 consumer.onScanMetricsCreated(scanMetrics); 119 } else { 120 this.scanMetrics = null; 121 } 122 123 /* 124 * Assumes that the `start()` method is called immediately after construction. If this is no 125 * longer the case, for tracing correctness, we should move the start of the span into the 126 * `start()` method. The cost of doing so would be making access to the `span` safe for 127 * concurrent threads. 128 */ 129 span = new TableOperationSpanBuilder(conn).setTableName(tableName).setOperation(scan).build(); 130 if (consumer instanceof AsyncTableResultScanner) { 131 AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer; 132 scanner.setSpan(span); 133 } 134 } 135 136 private static final class OpenScannerResponse { 137 138 public final HRegionLocation loc; 139 140 public final boolean isRegionServerRemote; 141 142 public final ClientService.Interface stub; 143 144 public final HBaseRpcController controller; 145 146 public final ScanResponse resp; 147 148 public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub, 149 HBaseRpcController controller, ScanResponse resp) { 150 this.loc = loc; 151 this.isRegionServerRemote = isRegionServerRemote; 152 this.stub = stub; 153 this.controller = controller; 154 this.resp = resp; 155 } 156 } 157 158 private final AtomicInteger openScannerTries = new AtomicInteger(); 159 160 private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller, 161 HRegionLocation loc, ClientService.Interface stub) { 162 try (Scope ignored = span.makeCurrent()) { 163 boolean isRegionServerRemote = isRemote(loc.getHostname()); 164 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 165 if (openScannerTries.getAndIncrement() > 1) { 166 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 167 } 168 CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>(); 169 try { 170 ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), 171 scan, scan.getCaching(), false); 172 stub.scan(controller, request, resp -> { 173 try (Scope ignored1 = span.makeCurrent()) { 174 if (controller.failed()) { 175 final IOException e = controller.getFailed(); 176 future.completeExceptionally(e); 177 TraceUtil.setError(span, e); 178 span.end(); 179 return; 180 } 181 future 182 .complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp)); 183 } 184 }); 185 } catch (IOException e) { 186 // span is closed by listener attached to the Future in `openScanner()` 187 future.completeExceptionally(e); 188 } 189 return future; 190 } 191 } 192 193 private void startScan(OpenScannerResponse resp) { 194 addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()) 195 .location(resp.loc).remote(resp.isRegionServerRemote) 196 .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) 197 .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) 198 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 199 .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 200 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 201 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) 202 .start(resp.controller, resp.resp), (hasMore, error) -> { 203 try (Scope ignored = span.makeCurrent()) { 204 if (error != null) { 205 try { 206 consumer.onError(error); 207 return; 208 } finally { 209 TraceUtil.setError(span, error); 210 span.end(); 211 } 212 } 213 if (hasMore) { 214 openScanner(); 215 } else { 216 try { 217 consumer.onComplete(); 218 } finally { 219 span.setStatus(StatusCode.OK); 220 span.end(); 221 } 222 } 223 } 224 }); 225 } 226 227 private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) { 228 try (Scope ignored = span.makeCurrent()) { 229 return conn.callerFactory.<OpenScannerResponse> single().table(tableName) 230 .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) 231 .priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 232 .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 233 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 234 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner) 235 .call(); 236 } 237 } 238 239 private long getPrimaryTimeoutNs() { 240 return TableName.isMetaTableName(tableName) 241 ? conn.connConf.getPrimaryMetaScanTimeoutNs() 242 : conn.connConf.getPrimaryScanTimeoutNs(); 243 } 244 245 private void openScanner() { 246 incRegionCountMetrics(scanMetrics); 247 openScannerTries.set(1); 248 addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), 249 getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer, 250 conn.getConnectionMetrics()), (resp, error) -> { 251 try (Scope ignored = span.makeCurrent()) { 252 if (error != null) { 253 try { 254 consumer.onError(error); 255 return; 256 } finally { 257 TraceUtil.setError(span, error); 258 span.end(); 259 } 260 } 261 startScan(resp); 262 } 263 }); 264 } 265 266 public void start() { 267 try (Scope ignored = span.makeCurrent()) { 268 openScanner(); 269 } 270 } 271}