001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021 022import io.opentelemetry.api.trace.Span; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.ArrayDeque; 026import java.util.Queue; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 029import org.apache.hadoop.hbase.util.FutureUtils; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * The {@link ResultScanner} implementation for {@link RawAsyncTableImpl}. It will fetch data 036 * automatically in background and cache it in memory. Typically, the {@link #maxCacheSize} will be 037 * {@code 2 * scan.getMaxResultSize()}. 038 */ 039@InterfaceAudience.Private 040class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer { 041 042 private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class); 043 044 private final TableName tableName; 045 046 private final long maxCacheSize; 047 048 private final Scan scan; 049 050 private final Queue<Result> queue = new ArrayDeque<>(); 051 052 private ScanMetrics scanMetrics; 053 054 private long cacheSize; 055 056 private boolean closed = false; 057 058 private Throwable error; 059 060 private ScanResumer resumer; 061 062 // Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`. 063 private Span span = null; 064 065 public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) { 066 this.tableName = tableName; 067 this.maxCacheSize = maxCacheSize; 068 this.scan = scan; 069 } 070 071 private void addToCache(Result result) { 072 queue.add(result); 073 cacheSize += calcEstimatedSize(result); 074 } 075 076 private void stopPrefetch(ScanController controller) { 077 if (LOG.isDebugEnabled()) { 078 LOG.debug( 079 "{} stop prefetching when scanning {} as the cache size {}" 080 + " is greater than the maxCacheSize {}", 081 String.format("0x%x", System.identityHashCode(this)), tableName, cacheSize, maxCacheSize); 082 } 083 resumer = controller.suspend(); 084 } 085 086 Span getSpan() { 087 return span; 088 } 089 090 void setSpan(final Span span) { 091 this.span = span; 092 } 093 094 @Override 095 public synchronized void onNext(Result[] results, ScanController controller) { 096 assert results.length > 0; 097 if (closed) { 098 controller.terminate(); 099 return; 100 } 101 for (Result result : results) { 102 addToCache(result); 103 } 104 notifyAll(); 105 if (cacheSize >= maxCacheSize) { 106 stopPrefetch(controller); 107 } 108 } 109 110 @Override 111 public synchronized void onHeartbeat(ScanController controller) { 112 if (closed) { 113 controller.terminate(); 114 return; 115 } 116 if (scan.isNeedCursorResult()) { 117 controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c))); 118 } 119 } 120 121 @Override 122 public synchronized void onError(Throwable error) { 123 this.error = error; 124 notifyAll(); 125 } 126 127 @Override 128 public synchronized void onComplete() { 129 closed = true; 130 notifyAll(); 131 } 132 133 @Override 134 public void onScanMetricsCreated(ScanMetrics scanMetrics) { 135 this.scanMetrics = scanMetrics; 136 } 137 138 private void resumePrefetch() { 139 if (LOG.isDebugEnabled()) { 140 LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching"); 141 } 142 resumer.resume(); 143 resumer = null; 144 } 145 146 @Override 147 public synchronized Result next() throws IOException { 148 while (queue.isEmpty()) { 149 if (closed) { 150 return null; 151 } 152 if (error != null) { 153 throw FutureUtils.rethrow(error); 154 } 155 try { 156 wait(); 157 } catch (InterruptedException e) { 158 throw new InterruptedIOException(); 159 } 160 } 161 Result result = queue.poll(); 162 if (!result.isCursor()) { 163 cacheSize -= calcEstimatedSize(result); 164 if (resumer != null && cacheSize <= maxCacheSize / 2) { 165 resumePrefetch(); 166 } 167 } 168 return result; 169 } 170 171 @Override 172 public synchronized void close() { 173 closed = true; 174 queue.clear(); 175 cacheSize = 0; 176 if (resumer != null) { 177 resumePrefetch(); 178 } 179 notifyAll(); 180 } 181 182 @Override 183 public boolean renewLease() { 184 // we will do prefetching in the background and if there is no space we will just suspend the 185 // scanner. The renew lease operation will be handled in the background. 186 return false; 187 } 188 189 // used in tests to test whether the scanner has been suspended 190 synchronized boolean isSuspended() { 191 return resumer != null; 192 } 193 194 @Override 195 public ScanMetrics getScanMetrics() { 196 return scanMetrics; 197 } 198}