001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
021import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
029import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
030
031import io.opentelemetry.api.trace.Span;
032import io.opentelemetry.api.trace.StatusCode;
033import io.opentelemetry.context.Scope;
034import java.io.IOException;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicInteger;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
041import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
042import org.apache.hadoop.hbase.ipc.HBaseRpcController;
043import org.apache.hadoop.hbase.trace.TraceUtil;
044import org.apache.yetus.audience.InterfaceAudience;
045
046import org.apache.hbase.thirdparty.io.netty.util.Timer;
047
048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
053
054/**
055 * The asynchronous client scanner implementation.
056 * <p>
057 * Here we will call OpenScanner first and use the returned scannerId to create a
058 * {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of
059 * {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish
060 * scan.
061 */
062@InterfaceAudience.Private
063class AsyncClientScanner {
064
065  // We will use this scan object during the whole scan operation. The
066  // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
067  private final Scan scan;
068
069  private final ScanMetrics scanMetrics;
070
071  private final AdvancedScanResultConsumer consumer;
072
073  private final TableName tableName;
074
075  private final AsyncConnectionImpl conn;
076
077  private final Timer retryTimer;
078
079  private final long pauseNs;
080
081  private final long pauseNsForServerOverloaded;
082
083  private final int maxAttempts;
084
085  private final long scanTimeoutNs;
086
087  private final long rpcTimeoutNs;
088
089  private final int startLogErrorsCnt;
090
091  private final ScanResultCache resultCache;
092
093  private final Span span;
094
095  public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
096    AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
097    int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
098    if (scan.getStartRow() == null) {
099      scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
100    }
101    if (scan.getStopRow() == null) {
102      scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow());
103    }
104    this.scan = scan;
105    this.consumer = consumer;
106    this.tableName = tableName;
107    this.conn = conn;
108    this.retryTimer = retryTimer;
109    this.pauseNs = pauseNs;
110    this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
111    this.maxAttempts = maxAttempts;
112    this.scanTimeoutNs = scanTimeoutNs;
113    this.rpcTimeoutNs = rpcTimeoutNs;
114    this.startLogErrorsCnt = startLogErrorsCnt;
115    this.resultCache = createScanResultCache(scan);
116    if (scan.isScanMetricsEnabled()) {
117      this.scanMetrics = new ScanMetrics();
118      consumer.onScanMetricsCreated(scanMetrics);
119    } else {
120      this.scanMetrics = null;
121    }
122
123    /*
124     * Assumes that the `start()` method is called immediately after construction. If this is no
125     * longer the case, for tracing correctness, we should move the start of the span into the
126     * `start()` method. The cost of doing so would be making access to the `span` safe for
127     * concurrent threads.
128     */
129    span = new TableOperationSpanBuilder(conn).setTableName(tableName).setOperation(scan).build();
130    if (consumer instanceof AsyncTableResultScanner) {
131      AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer;
132      scanner.setSpan(span);
133    }
134  }
135
136  private static final class OpenScannerResponse {
137
138    public final HRegionLocation loc;
139
140    public final boolean isRegionServerRemote;
141
142    public final ClientService.Interface stub;
143
144    public final HBaseRpcController controller;
145
146    public final ScanResponse resp;
147
148    public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub,
149      HBaseRpcController controller, ScanResponse resp) {
150      this.loc = loc;
151      this.isRegionServerRemote = isRegionServerRemote;
152      this.stub = stub;
153      this.controller = controller;
154      this.resp = resp;
155    }
156  }
157
158  private final AtomicInteger openScannerTries = new AtomicInteger();
159
160  private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
161    HRegionLocation loc, ClientService.Interface stub) {
162    try (Scope ignored = span.makeCurrent()) {
163      boolean isRegionServerRemote = isRemote(loc.getHostname());
164      incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
165      if (openScannerTries.getAndIncrement() > 1) {
166        incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
167      }
168      CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
169      try {
170        ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(),
171          scan, scan.getCaching(), false);
172        stub.scan(controller, request, resp -> {
173          try (Scope ignored1 = span.makeCurrent()) {
174            if (controller.failed()) {
175              final IOException e = controller.getFailed();
176              future.completeExceptionally(e);
177              TraceUtil.setError(span, e);
178              span.end();
179              return;
180            }
181            future
182              .complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
183          }
184        });
185      } catch (IOException e) {
186        // span is closed by listener attached to the Future in `openScanner()`
187        future.completeExceptionally(e);
188      }
189      return future;
190    }
191  }
192
193  private void startScan(OpenScannerResponse resp) {
194    addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
195      .location(resp.loc).remote(resp.isRegionServerRemote)
196      .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
197      .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
198      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
199      .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
200      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
201      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
202      .start(resp.controller, resp.resp), (hasMore, error) -> {
203        try (Scope ignored = span.makeCurrent()) {
204          if (error != null) {
205            try {
206              consumer.onError(error);
207              return;
208            } finally {
209              TraceUtil.setError(span, error);
210              span.end();
211            }
212          }
213          if (hasMore) {
214            openScanner();
215          } else {
216            try {
217              consumer.onComplete();
218            } finally {
219              span.setStatus(StatusCode.OK);
220              span.end();
221            }
222          }
223        }
224      });
225  }
226
227  private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
228    try (Scope ignored = span.makeCurrent()) {
229      return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
230        .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
231        .priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
232        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
233        .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
234        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
235        .call();
236    }
237  }
238
239  private long getPrimaryTimeoutNs() {
240    return TableName.isMetaTableName(tableName)
241      ? conn.connConf.getPrimaryMetaScanTimeoutNs()
242      : conn.connConf.getPrimaryScanTimeoutNs();
243  }
244
245  private void openScanner() {
246    incRegionCountMetrics(scanMetrics);
247    openScannerTries.set(1);
248    addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
249      getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
250      conn.getConnectionMetrics()), (resp, error) -> {
251        try (Scope ignored = span.makeCurrent()) {
252          if (error != null) {
253            try {
254              consumer.onError(error);
255              return;
256            } finally {
257              TraceUtil.setError(span, error);
258              span.end();
259            }
260          }
261          startScan(resp);
262        }
263      });
264  }
265
266  public void start() {
267    try (Scope ignored = span.makeCurrent()) {
268      openScanner();
269    }
270  }
271}