001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
021import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
029import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
030
031import io.opentelemetry.api.trace.Span;
032import io.opentelemetry.api.trace.StatusCode;
033import io.opentelemetry.context.Scope;
034import java.io.IOException;
035import java.util.Map;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicInteger;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
042import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
043import org.apache.hadoop.hbase.ipc.HBaseRpcController;
044import org.apache.hadoop.hbase.trace.TraceUtil;
045import org.apache.yetus.audience.InterfaceAudience;
046
047import org.apache.hbase.thirdparty.io.netty.util.Timer;
048
049import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
054
055/**
056 * The asynchronous client scanner implementation.
057 * <p>
058 * Here we will call OpenScanner first and use the returned scannerId to create a
059 * {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of
060 * {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish
061 * scan.
062 */
063@InterfaceAudience.Private
064class AsyncClientScanner {
065
066  // We will use this scan object during the whole scan operation. The
067  // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
068  private final Scan scan;
069
070  private final ScanMetrics scanMetrics;
071
072  private final AdvancedScanResultConsumer consumer;
073
074  private final TableName tableName;
075
076  private final AsyncConnectionImpl conn;
077
078  private final Timer retryTimer;
079
080  private final long pauseNs;
081
082  private final long pauseNsForServerOverloaded;
083
084  private final int maxAttempts;
085
086  private final long scanTimeoutNs;
087
088  private final long rpcTimeoutNs;
089
090  private final int startLogErrorsCnt;
091
092  private final ScanResultCache resultCache;
093
094  private final Span span;
095
096  private final Map<String, byte[]> requestAttributes;
097
098  private final boolean isScanMetricsByRegionEnabled;
099
100  public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
101    AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
102    int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
103    Map<String, byte[]> requestAttributes) {
104    if (scan.getStartRow() == null) {
105      scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
106    }
107    if (scan.getStopRow() == null) {
108      scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow());
109    }
110    this.scan = scan;
111    this.consumer = consumer;
112    this.tableName = tableName;
113    this.conn = conn;
114    this.retryTimer = retryTimer;
115    this.pauseNs = pauseNs;
116    this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
117    this.maxAttempts = maxAttempts;
118    this.scanTimeoutNs = scanTimeoutNs;
119    this.rpcTimeoutNs = rpcTimeoutNs;
120    this.startLogErrorsCnt = startLogErrorsCnt;
121    this.resultCache = createScanResultCache(scan);
122    this.requestAttributes = requestAttributes;
123    boolean isScanMetricsByRegionEnabled = false;
124    if (scan.isScanMetricsEnabled()) {
125      this.scanMetrics = new ScanMetrics();
126      consumer.onScanMetricsCreated(scanMetrics);
127      if (this.scan.isScanMetricsByRegionEnabled()) {
128        isScanMetricsByRegionEnabled = true;
129      }
130    } else {
131      this.scanMetrics = null;
132    }
133    this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled;
134
135    /*
136     * Assumes that the `start()` method is called immediately after construction. If this is no
137     * longer the case, for tracing correctness, we should move the start of the span into the
138     * `start()` method. The cost of doing so would be making access to the `span` safe for
139     * concurrent threads.
140     */
141    span = new TableOperationSpanBuilder(conn).setTableName(tableName).setOperation(scan).build();
142    if (consumer instanceof AsyncTableResultScanner) {
143      AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer;
144      scanner.setSpan(span);
145    }
146  }
147
148  private static final class OpenScannerResponse {
149
150    public final HRegionLocation loc;
151
152    public final boolean isRegionServerRemote;
153
154    public final ClientService.Interface stub;
155
156    public final HBaseRpcController controller;
157
158    public final ScanResponse resp;
159
160    public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub,
161      HBaseRpcController controller, ScanResponse resp) {
162      this.loc = loc;
163      this.isRegionServerRemote = isRegionServerRemote;
164      this.stub = stub;
165      this.controller = controller;
166      this.resp = resp;
167    }
168  }
169
170  private final AtomicInteger openScannerTries = new AtomicInteger();
171
172  private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
173    HRegionLocation loc, ClientService.Interface stub) {
174    try (Scope ignored = span.makeCurrent()) {
175      boolean isRegionServerRemote = isRemote(loc.getHostname());
176      incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
177      if (openScannerTries.getAndIncrement() > 1) {
178        incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
179      }
180      CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
181      try {
182        ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(),
183          scan, scan.getCaching(), false);
184        stub.scan(controller, request, resp -> {
185          try (Scope ignored1 = span.makeCurrent()) {
186            if (controller.failed()) {
187              final IOException e = controller.getFailed();
188              future.completeExceptionally(e);
189              TraceUtil.setError(span, e);
190              span.end();
191              return;
192            }
193            future
194              .complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
195          }
196        });
197      } catch (IOException e) {
198        // span is closed by listener attached to the Future in `openScanner()`
199        future.completeExceptionally(e);
200      }
201      return future;
202    }
203  }
204
205  private void startScan(OpenScannerResponse resp) {
206    addListener(
207      conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
208        .remote(resp.isRegionServerRemote)
209        .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
210        .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
211        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
212        .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
213        .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
214        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
215        .setRequestAttributes(requestAttributes).start(resp.controller, resp.resp),
216      (hasMore, error) -> {
217        try (Scope ignored = span.makeCurrent()) {
218          if (error != null) {
219            try {
220              consumer.onError(error);
221              return;
222            } finally {
223              TraceUtil.setError(span, error);
224              span.end();
225            }
226          }
227          if (hasMore) {
228            openScanner();
229          } else {
230            try {
231              consumer.onComplete();
232            } finally {
233              span.setStatus(StatusCode.OK);
234              span.end();
235            }
236          }
237        }
238      });
239  }
240
241  private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
242    try (Scope ignored = span.makeCurrent()) {
243      return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
244        .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
245        .priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
246        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
247        .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
248        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
249        .setRequestAttributes(requestAttributes).action(this::callOpenScanner).call();
250    }
251  }
252
253  private long getPrimaryTimeoutNs() {
254    return TableName.isMetaTableName(tableName)
255      ? conn.connConf.getPrimaryMetaScanTimeoutNs()
256      : conn.connConf.getPrimaryScanTimeoutNs();
257  }
258
259  private void openScanner() {
260    if (this.isScanMetricsByRegionEnabled) {
261      scanMetrics.moveToNextRegion();
262    }
263    incRegionCountMetrics(scanMetrics);
264    openScannerTries.set(1);
265    addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
266      getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
267      conn.getConnectionMetrics()), (resp, error) -> {
268        try (Scope ignored = span.makeCurrent()) {
269          if (error != null) {
270            try {
271              consumer.onError(error);
272              return;
273            } finally {
274              TraceUtil.setError(span, error);
275              span.end();
276            }
277          }
278          if (this.isScanMetricsByRegionEnabled) {
279            HRegionLocation loc = resp.loc;
280            this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(),
281              loc.getServerName());
282          }
283          startScan(resp);
284        }
285      });
286  }
287
288  public void start() {
289    try (Scope ignored = span.makeCurrent()) {
290      openScanner();
291    }
292  }
293}