001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021 022import io.opentelemetry.api.trace.Span; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.ArrayDeque; 026import java.util.Queue; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.AsyncScanSingleRegionRpcRetryingCaller.ScanResumerImpl; 029import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 030import org.apache.hadoop.hbase.util.FutureUtils; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * The {@link ResultScanner} implementation for {@link RawAsyncTableImpl}. It will fetch data 037 * automatically in background and cache it in memory. Typically, the {@link #maxCacheSize} will be 038 * {@code 2 * scan.getMaxResultSize()}. 039 */ 040@InterfaceAudience.Private 041class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer { 042 043 private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class); 044 045 private final TableName tableName; 046 047 private final long maxCacheSize; 048 049 private final Scan scan; 050 051 private final Queue<Result> queue = new ArrayDeque<>(); 052 053 private ScanMetrics scanMetrics; 054 055 private long cacheSize; 056 057 private boolean closed = false; 058 059 private Throwable error; 060 061 private ScanResumer resumer; 062 063 // Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`. 064 private Span span = null; 065 066 public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) { 067 this.tableName = tableName; 068 this.maxCacheSize = maxCacheSize; 069 this.scan = scan; 070 } 071 072 private void addToCache(Result result) { 073 queue.add(result); 074 cacheSize += calcEstimatedSize(result); 075 } 076 077 private void stopPrefetch(ScanController controller) { 078 if (LOG.isDebugEnabled()) { 079 LOG.debug( 080 "{} stop prefetching when scanning {} as the cache size {}" 081 + " is greater than the maxCacheSize {}", 082 String.format("0x%x", System.identityHashCode(this)), tableName, cacheSize, maxCacheSize); 083 } 084 resumer = controller.suspend(); 085 } 086 087 Span getSpan() { 088 return span; 089 } 090 091 void setSpan(final Span span) { 092 this.span = span; 093 } 094 095 @Override 096 public synchronized void onNext(Result[] results, ScanController controller) { 097 assert results.length > 0; 098 if (closed) { 099 controller.terminate(); 100 return; 101 } 102 for (Result result : results) { 103 addToCache(result); 104 } 105 notifyAll(); 106 if (cacheSize >= maxCacheSize) { 107 stopPrefetch(controller); 108 } 109 } 110 111 @Override 112 public synchronized void onHeartbeat(ScanController controller) { 113 if (closed) { 114 controller.terminate(); 115 return; 116 } 117 if (scan.isNeedCursorResult()) { 118 controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c))); 119 } 120 } 121 122 @Override 123 public synchronized void onError(Throwable error) { 124 this.error = error; 125 notifyAll(); 126 } 127 128 @Override 129 public synchronized void onComplete() { 130 closed = true; 131 notifyAll(); 132 } 133 134 @Override 135 public void onScanMetricsCreated(ScanMetrics scanMetrics) { 136 this.scanMetrics = scanMetrics; 137 } 138 139 private void resumePrefetch() { 140 if (LOG.isDebugEnabled()) { 141 LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching"); 142 } 143 resumer.resume(); 144 resumer = null; 145 } 146 147 private void terminateResumerIfPossible() { 148 if (resumer == null) { 149 return; 150 } 151 // AsyncTableResultScanner.close means we do not need scan results any more, but for 152 // ScanResumerImpl.resume, it would perform another scan on RegionServer and call 153 // AsyncTableResultScanner.onNext again when ScanResponse is received. This time 154 // AsyncTableResultScanner.onNext would do nothing else but just discard ScanResponse 155 // because AsyncTableResultScanner.closed is true. So here we would better save this 156 // unnecessary scan on RegionServer and introduce ScanResumerImpl.terminate to close 157 // scanner directly. 158 if (resumer instanceof ScanResumerImpl) { 159 ((ScanResumerImpl) resumer).terminate(); 160 } else { 161 resumePrefetch(); 162 } 163 resumer = null; 164 } 165 166 @Override 167 public synchronized Result next() throws IOException { 168 while (queue.isEmpty()) { 169 if (closed) { 170 return null; 171 } 172 if (error != null) { 173 throw FutureUtils.rethrow(error); 174 } 175 try { 176 wait(); 177 } catch (InterruptedException e) { 178 throw new InterruptedIOException(); 179 } 180 } 181 Result result = queue.poll(); 182 if (!result.isCursor()) { 183 cacheSize -= calcEstimatedSize(result); 184 if (resumer != null && cacheSize <= maxCacheSize / 2) { 185 resumePrefetch(); 186 } 187 } 188 return result; 189 } 190 191 @Override 192 public synchronized void close() { 193 closed = true; 194 queue.clear(); 195 cacheSize = 0; 196 terminateResumerIfPossible(); 197 notifyAll(); 198 } 199 200 @Override 201 public boolean renewLease() { 202 // we will do prefetching in the background and if there is no space we will just suspend the 203 // scanner. The renew lease operation will be handled in the background. 204 return false; 205 } 206 207 // used in tests to test whether the scanner has been suspended 208 synchronized boolean isSuspended() { 209 return resumer != null; 210 } 211 212 @Override 213 public ScanMetrics getScanMetrics() { 214 return scanMetrics; 215 } 216 217 int getCacheSize() { 218 return queue.size(); 219 } 220}