001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 029import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 030 031import java.io.IOException; 032import java.util.concurrent.CompletableFuture; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicInteger; 035import org.apache.hadoop.hbase.HRegionLocation; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 038import org.apache.hadoop.hbase.ipc.HBaseRpcController; 039import org.apache.yetus.audience.InterfaceAudience; 040 041import org.apache.hbase.thirdparty.io.netty.util.Timer; 042 043import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 048 049/** 050 * The asynchronous client scanner implementation. 051 * <p> 052 * Here we will call OpenScanner first and use the returned scannerId to create a 053 * {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of 054 * {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish 055 * scan. 056 */ 057@InterfaceAudience.Private 058class AsyncClientScanner { 059 060 // We will use this scan object during the whole scan operation. The 061 // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly. 062 private final Scan scan; 063 064 private final ScanMetrics scanMetrics; 065 066 private final AdvancedScanResultConsumer consumer; 067 068 private final TableName tableName; 069 070 private final AsyncConnectionImpl conn; 071 072 private final Timer retryTimer; 073 074 private final long pauseNs; 075 076 private final int maxAttempts; 077 078 private final long scanTimeoutNs; 079 080 private final long rpcTimeoutNs; 081 082 private final int startLogErrorsCnt; 083 084 private final ScanResultCache resultCache; 085 086 public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, 087 AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int maxAttempts, long scanTimeoutNs, 088 long rpcTimeoutNs, int startLogErrorsCnt) { 089 if (scan.getStartRow() == null) { 090 scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); 091 } 092 if (scan.getStopRow() == null) { 093 scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow()); 094 } 095 this.scan = scan; 096 this.consumer = consumer; 097 this.tableName = tableName; 098 this.conn = conn; 099 this.retryTimer = retryTimer; 100 this.pauseNs = pauseNs; 101 this.maxAttempts = maxAttempts; 102 this.scanTimeoutNs = scanTimeoutNs; 103 this.rpcTimeoutNs = rpcTimeoutNs; 104 this.startLogErrorsCnt = startLogErrorsCnt; 105 this.resultCache = createScanResultCache(scan); 106 if (scan.isScanMetricsEnabled()) { 107 this.scanMetrics = new ScanMetrics(); 108 consumer.onScanMetricsCreated(scanMetrics); 109 } else { 110 this.scanMetrics = null; 111 } 112 } 113 114 private static final class OpenScannerResponse { 115 116 public final HRegionLocation loc; 117 118 public final boolean isRegionServerRemote; 119 120 public final ClientService.Interface stub; 121 122 public final HBaseRpcController controller; 123 124 public final ScanResponse resp; 125 126 public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub, 127 HBaseRpcController controller, ScanResponse resp) { 128 this.loc = loc; 129 this.isRegionServerRemote = isRegionServerRemote; 130 this.stub = stub; 131 this.controller = controller; 132 this.resp = resp; 133 } 134 } 135 136 private final AtomicInteger openScannerTries = new AtomicInteger(); 137 138 private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller, 139 HRegionLocation loc, ClientService.Interface stub) { 140 boolean isRegionServerRemote = isRemote(loc.getHostname()); 141 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 142 if (openScannerTries.getAndIncrement() > 1) { 143 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 144 } 145 CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>(); 146 try { 147 ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan, 148 scan.getCaching(), false); 149 stub.scan(controller, request, resp -> { 150 if (controller.failed()) { 151 future.completeExceptionally(controller.getFailed()); 152 return; 153 } 154 future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp)); 155 }); 156 } catch (IOException e) { 157 future.completeExceptionally(e); 158 } 159 return future; 160 } 161 162 private void startScan(OpenScannerResponse resp) { 163 addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()) 164 .location(resp.loc).remote(resp.isRegionServerRemote) 165 .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) 166 .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) 167 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 168 .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 169 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) 170 .start(resp.controller, resp.resp), (hasMore, error) -> { 171 if (error != null) { 172 consumer.onError(error); 173 return; 174 } 175 if (hasMore) { 176 openScanner(); 177 } else { 178 consumer.onComplete(); 179 } 180 }); 181 } 182 183 private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) { 184 return conn.callerFactory.<OpenScannerResponse> single().table(tableName) 185 .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) 186 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 187 .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 188 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner) 189 .call(); 190 } 191 192 private long getPrimaryTimeoutNs() { 193 return TableName.isMetaTableName(tableName) ? conn.connConf.getPrimaryMetaScanTimeoutNs() 194 : conn.connConf.getPrimaryScanTimeoutNs(); 195 } 196 197 private void openScanner() { 198 incRegionCountMetrics(scanMetrics); 199 openScannerTries.set(1); 200 addListener( 201 timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), 202 getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer), 203 (resp, error) -> { 204 if (error != null) { 205 consumer.onError(error); 206 return; 207 } 208 startScan(resp); 209 }); 210 } 211 212 public void start() { 213 openScanner(); 214 } 215}