001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 029import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 030 031import java.io.IOException; 032import java.util.concurrent.CompletableFuture; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicInteger; 035import org.apache.hadoop.hbase.HRegionLocation; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 038import org.apache.hadoop.hbase.ipc.HBaseRpcController; 039import org.apache.yetus.audience.InterfaceAudience; 040 041import org.apache.hbase.thirdparty.io.netty.util.Timer; 042 043import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 048 049/** 050 * The asynchronous client scanner implementation. 051 * <p> 052 * Here we will call OpenScanner first and use the returned scannerId to create a 053 * {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of 054 * {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish 055 * scan. 056 */ 057@InterfaceAudience.Private 058class AsyncClientScanner { 059 060 // We will use this scan object during the whole scan operation. The 061 // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly. 062 private final Scan scan; 063 064 private final ScanMetrics scanMetrics; 065 066 private final AdvancedScanResultConsumer consumer; 067 068 private final TableName tableName; 069 070 private final AsyncConnectionImpl conn; 071 072 private final Timer retryTimer; 073 074 private final long pauseNs; 075 076 private final long pauseForCQTBENs; 077 078 private final int maxAttempts; 079 080 private final long scanTimeoutNs; 081 082 private final long rpcTimeoutNs; 083 084 private final int startLogErrorsCnt; 085 086 private final ScanResultCache resultCache; 087 088 public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, 089 AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs, 090 int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { 091 if (scan.getStartRow() == null) { 092 scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); 093 } 094 if (scan.getStopRow() == null) { 095 scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow()); 096 } 097 this.scan = scan; 098 this.consumer = consumer; 099 this.tableName = tableName; 100 this.conn = conn; 101 this.retryTimer = retryTimer; 102 this.pauseNs = pauseNs; 103 this.pauseForCQTBENs = pauseForCQTBENs; 104 this.maxAttempts = maxAttempts; 105 this.scanTimeoutNs = scanTimeoutNs; 106 this.rpcTimeoutNs = rpcTimeoutNs; 107 this.startLogErrorsCnt = startLogErrorsCnt; 108 this.resultCache = createScanResultCache(scan); 109 if (scan.isScanMetricsEnabled()) { 110 this.scanMetrics = new ScanMetrics(); 111 consumer.onScanMetricsCreated(scanMetrics); 112 } else { 113 this.scanMetrics = null; 114 } 115 } 116 117 private static final class OpenScannerResponse { 118 119 public final HRegionLocation loc; 120 121 public final boolean isRegionServerRemote; 122 123 public final ClientService.Interface stub; 124 125 public final HBaseRpcController controller; 126 127 public final ScanResponse resp; 128 129 public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub, 130 HBaseRpcController controller, ScanResponse resp) { 131 this.loc = loc; 132 this.isRegionServerRemote = isRegionServerRemote; 133 this.stub = stub; 134 this.controller = controller; 135 this.resp = resp; 136 } 137 } 138 139 private final AtomicInteger openScannerTries = new AtomicInteger(); 140 141 private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller, 142 HRegionLocation loc, ClientService.Interface stub) { 143 boolean isRegionServerRemote = isRemote(loc.getHostname()); 144 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 145 if (openScannerTries.getAndIncrement() > 1) { 146 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 147 } 148 CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>(); 149 try { 150 ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan, 151 scan.getCaching(), false); 152 stub.scan(controller, request, resp -> { 153 if (controller.failed()) { 154 future.completeExceptionally(controller.getFailed()); 155 return; 156 } 157 future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp)); 158 }); 159 } catch (IOException e) { 160 future.completeExceptionally(e); 161 } 162 return future; 163 } 164 165 private void startScan(OpenScannerResponse resp) { 166 addListener( 167 conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) 168 .remote(resp.isRegionServerRemote) 169 .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) 170 .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) 171 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 172 .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 173 .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 174 .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp), 175 (hasMore, error) -> { 176 if (error != null) { 177 consumer.onError(error); 178 return; 179 } 180 if (hasMore) { 181 openScanner(); 182 } else { 183 consumer.onComplete(); 184 } 185 }); 186 } 187 188 private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) { 189 return conn.callerFactory.<OpenScannerResponse> single().table(tableName) 190 .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) 191 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 192 .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 193 .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 194 .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call(); 195 } 196 197 private long getPrimaryTimeoutNs() { 198 return TableName.isMetaTableName(tableName) ? conn.connConf.getPrimaryMetaScanTimeoutNs() 199 : conn.connConf.getPrimaryScanTimeoutNs(); 200 } 201 202 private void openScanner() { 203 incRegionCountMetrics(scanMetrics); 204 openScannerTries.set(1); 205 addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), 206 getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer, 207 conn.getConnectionMetrics()), (resp, error) -> { 208 if (error != null) { 209 consumer.onError(error); 210 return; 211 } 212 startScan(resp); 213 }); 214 } 215 216 public void start() { 217 openScanner(); 218 } 219}