001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
021import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
029import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
030
031import java.io.IOException;
032import java.util.concurrent.CompletableFuture;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicInteger;
035import org.apache.hadoop.hbase.HRegionLocation;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
038import org.apache.hadoop.hbase.ipc.HBaseRpcController;
039import org.apache.yetus.audience.InterfaceAudience;
040
041import org.apache.hbase.thirdparty.io.netty.util.Timer;
042
043import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
048
049/**
050 * The asynchronous client scanner implementation.
051 * <p>
052 * Here we will call OpenScanner first and use the returned scannerId to create a
053 * {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of
054 * {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish
055 * scan.
056 */
057@InterfaceAudience.Private
058class AsyncClientScanner {
059
060  // We will use this scan object during the whole scan operation. The
061  // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
062  private final Scan scan;
063
064  private final ScanMetrics scanMetrics;
065
066  private final AdvancedScanResultConsumer consumer;
067
068  private final TableName tableName;
069
070  private final AsyncConnectionImpl conn;
071
072  private final Timer retryTimer;
073
074  private final long pauseNs;
075
076  private final long pauseForCQTBENs;
077
078  private final int maxAttempts;
079
080  private final long scanTimeoutNs;
081
082  private final long rpcTimeoutNs;
083
084  private final int startLogErrorsCnt;
085
086  private final ScanResultCache resultCache;
087
088  public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
089      AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
090      int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
091    if (scan.getStartRow() == null) {
092      scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
093    }
094    if (scan.getStopRow() == null) {
095      scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow());
096    }
097    this.scan = scan;
098    this.consumer = consumer;
099    this.tableName = tableName;
100    this.conn = conn;
101    this.retryTimer = retryTimer;
102    this.pauseNs = pauseNs;
103    this.pauseForCQTBENs = pauseForCQTBENs;
104    this.maxAttempts = maxAttempts;
105    this.scanTimeoutNs = scanTimeoutNs;
106    this.rpcTimeoutNs = rpcTimeoutNs;
107    this.startLogErrorsCnt = startLogErrorsCnt;
108    this.resultCache = createScanResultCache(scan);
109    if (scan.isScanMetricsEnabled()) {
110      this.scanMetrics = new ScanMetrics();
111      consumer.onScanMetricsCreated(scanMetrics);
112    } else {
113      this.scanMetrics = null;
114    }
115  }
116
117  private static final class OpenScannerResponse {
118
119    public final HRegionLocation loc;
120
121    public final boolean isRegionServerRemote;
122
123    public final ClientService.Interface stub;
124
125    public final HBaseRpcController controller;
126
127    public final ScanResponse resp;
128
129    public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub,
130        HBaseRpcController controller, ScanResponse resp) {
131      this.loc = loc;
132      this.isRegionServerRemote = isRegionServerRemote;
133      this.stub = stub;
134      this.controller = controller;
135      this.resp = resp;
136    }
137  }
138
139  private final AtomicInteger openScannerTries = new AtomicInteger();
140
141  private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
142      HRegionLocation loc, ClientService.Interface stub) {
143    boolean isRegionServerRemote = isRemote(loc.getHostname());
144    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
145    if (openScannerTries.getAndIncrement() > 1) {
146      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
147    }
148    CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
149    try {
150      ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
151        scan.getCaching(), false);
152      stub.scan(controller, request, resp -> {
153        if (controller.failed()) {
154          future.completeExceptionally(controller.getFailed());
155          return;
156        }
157        future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
158      });
159    } catch (IOException e) {
160      future.completeExceptionally(e);
161    }
162    return future;
163  }
164
165  private void startScan(OpenScannerResponse resp) {
166    addListener(
167      conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
168        .remote(resp.isRegionServerRemote)
169        .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
170        .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
171        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
172        .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
173        .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
174        .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
175      (hasMore, error) -> {
176        if (error != null) {
177          consumer.onError(error);
178          return;
179        }
180        if (hasMore) {
181          openScanner();
182        } else {
183          consumer.onComplete();
184        }
185      });
186  }
187
188  private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
189    return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
190      .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
191      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
192      .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
193      .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
194      .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
195  }
196
197  private long getPrimaryTimeoutNs() {
198    return TableName.isMetaTableName(tableName) ? conn.connConf.getPrimaryMetaScanTimeoutNs()
199      : conn.connConf.getPrimaryScanTimeoutNs();
200  }
201
202  private void openScanner() {
203    incRegionCountMetrics(scanMetrics);
204    openScannerTries.set(1);
205    addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
206      getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
207      conn.getConnectionMetrics()), (resp, error) -> {
208        if (error != null) {
209          consumer.onError(error);
210          return;
211        }
212        startScan(resp);
213      });
214  }
215
216  public void start() {
217    openScanner();
218  }
219}