001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
021import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
029import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
030import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
031
032import io.opentelemetry.api.trace.Span;
033import io.opentelemetry.api.trace.StatusCode;
034import io.opentelemetry.context.Scope;
035import java.io.IOException;
036import java.util.Map;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicInteger;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
043import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
044import org.apache.hadoop.hbase.ipc.HBaseRpcController;
045import org.apache.hadoop.hbase.trace.TraceUtil;
046import org.apache.yetus.audience.InterfaceAudience;
047
048import org.apache.hbase.thirdparty.io.netty.util.Timer;
049
050import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
055
056/**
057 * The asynchronous client scanner implementation.
058 * <p>
059 * Here we will call OpenScanner first and use the returned scannerId to create a
060 * {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of
061 * {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish
062 * scan.
063 */
064@InterfaceAudience.Private
065class AsyncClientScanner {
066
067  // We will use this scan object during the whole scan operation. The
068  // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
069  private final Scan scan;
070
071  private final ScanMetrics scanMetrics;
072
073  private final AdvancedScanResultConsumer consumer;
074
075  private final TableName tableName;
076
077  private final AsyncConnectionImpl conn;
078
079  private final Timer retryTimer;
080
081  private final long pauseNs;
082
083  private final long pauseNsForServerOverloaded;
084
085  private final int maxAttempts;
086
087  private final long scanTimeoutNs;
088
089  private final long rpcTimeoutNs;
090
091  private final int startLogErrorsCnt;
092
093  private final ScanResultCache resultCache;
094
095  private final Span span;
096
097  private final Map<String, byte[]> requestAttributes;
098
099  private final boolean isScanMetricsByRegionEnabled;
100
101  public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
102    AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
103    int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
104    Map<String, byte[]> requestAttributes) {
105    if (scan.getStartRow() == null) {
106      scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
107    }
108    if (scan.getStopRow() == null) {
109      scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow());
110    }
111    this.scan = scan;
112    this.consumer = consumer;
113    this.tableName = tableName;
114    this.conn = conn;
115    this.retryTimer = retryTimer;
116    this.pauseNs = pauseNs;
117    this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
118    this.maxAttempts = maxAttempts;
119    this.scanTimeoutNs = scanTimeoutNs;
120    this.rpcTimeoutNs = rpcTimeoutNs;
121    this.startLogErrorsCnt = startLogErrorsCnt;
122    this.resultCache = createScanResultCache(scan);
123    this.requestAttributes = requestAttributes;
124    boolean isScanMetricsByRegionEnabled = false;
125    if (scan.isScanMetricsEnabled()) {
126      this.scanMetrics = new ScanMetrics();
127      consumer.onScanMetricsCreated(scanMetrics);
128      if (this.scan.isScanMetricsByRegionEnabled()) {
129        isScanMetricsByRegionEnabled = true;
130      }
131    } else {
132      this.scanMetrics = null;
133    }
134    this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled;
135
136    /*
137     * Assumes that the `start()` method is called immediately after construction. If this is no
138     * longer the case, for tracing correctness, we should move the start of the span into the
139     * `start()` method. The cost of doing so would be making access to the `span` safe for
140     * concurrent threads.
141     */
142    span = new TableOperationSpanBuilder(conn).setTableName(tableName).setOperation(scan).build();
143    if (consumer instanceof AsyncTableResultScanner) {
144      AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer;
145      scanner.setSpan(span);
146    }
147  }
148
149  private static final class OpenScannerResponse {
150
151    public final HRegionLocation loc;
152
153    public final boolean isRegionServerRemote;
154
155    public final ClientService.Interface stub;
156
157    public final HBaseRpcController controller;
158
159    public final ScanResponse resp;
160
161    public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub,
162      HBaseRpcController controller, ScanResponse resp) {
163      this.loc = loc;
164      this.isRegionServerRemote = isRegionServerRemote;
165      this.stub = stub;
166      this.controller = controller;
167      this.resp = resp;
168    }
169  }
170
171  private final AtomicInteger openScannerTries = new AtomicInteger();
172
173  private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
174    HRegionLocation loc, ClientService.Interface stub) {
175    try (Scope ignored = span.makeCurrent()) {
176      boolean isRegionServerRemote = isRemote(loc.getHostname());
177      incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
178      if (openScannerTries.getAndIncrement() > 1) {
179        incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
180      }
181      CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
182      try {
183        ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(),
184          scan, scan.getCaching(), false);
185        stub.scan(controller, request, resp -> {
186          try (Scope ignored1 = span.makeCurrent()) {
187            if (controller.failed()) {
188              final IOException e = controller.getFailed();
189              future.completeExceptionally(e);
190              TraceUtil.setError(span, e);
191              span.end();
192              return;
193            }
194            future
195              .complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
196          }
197        });
198      } catch (IOException e) {
199        // span is closed by listener attached to the Future in `openScanner()`
200        future.completeExceptionally(e);
201      }
202      return future;
203    }
204  }
205
206  // lastNextCallNanos is used to calculate the MILLIS_BETWEEN_NEXTS scan metrics
207  private void startScan(OpenScannerResponse resp, long lastNextCallNanos) {
208    AsyncScanSingleRegionRpcRetryingCaller scanSingleRegionCaller =
209      conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
210        .remote(resp.isRegionServerRemote)
211        .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
212        .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
213        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
214        .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
215        .lastNextCallNanos(lastNextCallNanos)
216        .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
217        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
218        .setRequestAttributes(requestAttributes).build();
219    addListener(scanSingleRegionCaller.start(resp.controller, resp.resp), (hasMore, error) -> {
220      try (Scope ignored = span.makeCurrent()) {
221        if (error != null) {
222          try {
223            consumer.onError(error);
224            return;
225          } finally {
226            TraceUtil.setError(span, error);
227            span.end();
228          }
229        }
230        if (hasMore) {
231          openScanner(scanSingleRegionCaller);
232        } else {
233          try {
234            consumer.onComplete();
235          } finally {
236            span.setStatus(StatusCode.OK);
237            span.end();
238          }
239        }
240      }
241    });
242  }
243
244  private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
245    try (Scope ignored = span.makeCurrent()) {
246      return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
247        .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
248        .priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
249        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
250        .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
251        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
252        .setRequestAttributes(requestAttributes).action(this::callOpenScanner).call();
253    }
254  }
255
256  private long getPrimaryTimeoutNs() {
257    return TableName.isMetaTableName(tableName)
258      ? conn.connConf.getPrimaryMetaScanTimeoutNs()
259      : conn.connConf.getPrimaryScanTimeoutNs();
260  }
261
262  private void openScanner(AsyncScanSingleRegionRpcRetryingCaller previousScanSingleRegionCaller) {
263    if (this.isScanMetricsByRegionEnabled) {
264      scanMetrics.moveToNextRegion();
265    }
266    incRegionCountMetrics(scanMetrics);
267    long openScannerStartNanos = System.nanoTime();
268    if (previousScanSingleRegionCaller != null) {
269      // open scanner is also a next call
270      incMillisBetweenNextsMetrics(scanMetrics, TimeUnit.NANOSECONDS
271        .toMillis(openScannerStartNanos - previousScanSingleRegionCaller.getLastNextCallNanos()));
272    }
273    openScannerTries.set(1);
274    addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
275      getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
276      conn.getConnectionMetrics()), (resp, error) -> {
277        try (Scope ignored = span.makeCurrent()) {
278          if (error != null) {
279            try {
280              consumer.onError(error);
281              return;
282            } finally {
283              TraceUtil.setError(span, error);
284              span.end();
285            }
286          }
287          if (this.isScanMetricsByRegionEnabled) {
288            HRegionLocation loc = resp.loc;
289            this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(),
290              loc.getServerName());
291          }
292          startScan(resp, openScannerStartNanos);
293        }
294      });
295  }
296
297  public void start() {
298    try (Scope ignored = span.makeCurrent()) {
299      openScanner(null);
300    }
301  }
302}