001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
021
022import io.opentelemetry.api.trace.Span;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.ArrayDeque;
026import java.util.Queue;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
029import org.apache.hadoop.hbase.util.FutureUtils;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * The {@link ResultScanner} implementation for {@link RawAsyncTableImpl}. It will fetch data
036 * automatically in background and cache it in memory. Typically, the {@link #maxCacheSize} will be
037 * {@code 2 * scan.getMaxResultSize()}.
038 */
039@InterfaceAudience.Private
040class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer {
041
042  private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class);
043
044  private final TableName tableName;
045
046  private final long maxCacheSize;
047
048  private final Scan scan;
049
050  private final Queue<Result> queue = new ArrayDeque<>();
051
052  private ScanMetrics scanMetrics;
053
054  private long cacheSize;
055
056  private boolean closed = false;
057
058  private Throwable error;
059
060  private ScanResumer resumer;
061
062  // Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`.
063  private Span span = null;
064
065  public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
066    this.tableName = tableName;
067    this.maxCacheSize = maxCacheSize;
068    this.scan = scan;
069  }
070
071  private void addToCache(Result result) {
072    queue.add(result);
073    cacheSize += calcEstimatedSize(result);
074  }
075
076  private void stopPrefetch(ScanController controller) {
077    if (LOG.isDebugEnabled()) {
078      LOG.debug(
079        "{} stop prefetching when scanning {} as the cache size {}"
080          + " is greater than the maxCacheSize {}",
081        String.format("0x%x", System.identityHashCode(this)), tableName, cacheSize, maxCacheSize);
082    }
083    resumer = controller.suspend();
084  }
085
086  Span getSpan() {
087    return span;
088  }
089
090  void setSpan(final Span span) {
091    this.span = span;
092  }
093
094  @Override
095  public synchronized void onNext(Result[] results, ScanController controller) {
096    assert results.length > 0;
097    if (closed) {
098      controller.terminate();
099      return;
100    }
101    for (Result result : results) {
102      addToCache(result);
103    }
104    notifyAll();
105    if (cacheSize >= maxCacheSize) {
106      stopPrefetch(controller);
107    }
108  }
109
110  @Override
111  public synchronized void onHeartbeat(ScanController controller) {
112    if (closed) {
113      controller.terminate();
114      return;
115    }
116    if (scan.isNeedCursorResult()) {
117      controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
118    }
119  }
120
121  @Override
122  public synchronized void onError(Throwable error) {
123    this.error = error;
124    notifyAll();
125  }
126
127  @Override
128  public synchronized void onComplete() {
129    closed = true;
130    notifyAll();
131  }
132
133  @Override
134  public void onScanMetricsCreated(ScanMetrics scanMetrics) {
135    this.scanMetrics = scanMetrics;
136  }
137
138  private void resumePrefetch() {
139    if (LOG.isDebugEnabled()) {
140      LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching");
141    }
142    resumer.resume();
143    resumer = null;
144  }
145
146  @Override
147  public synchronized Result next() throws IOException {
148    while (queue.isEmpty()) {
149      if (closed) {
150        return null;
151      }
152      if (error != null) {
153        throw FutureUtils.rethrow(error);
154      }
155      try {
156        wait();
157      } catch (InterruptedException e) {
158        throw new InterruptedIOException();
159      }
160    }
161    Result result = queue.poll();
162    if (!result.isCursor()) {
163      cacheSize -= calcEstimatedSize(result);
164      if (resumer != null && cacheSize <= maxCacheSize / 2) {
165        resumePrefetch();
166      }
167    }
168    return result;
169  }
170
171  @Override
172  public synchronized void close() {
173    closed = true;
174    queue.clear();
175    cacheSize = 0;
176    if (resumer != null) {
177      resumePrefetch();
178    }
179    notifyAll();
180  }
181
182  @Override
183  public boolean renewLease() {
184    // we will do prefetching in the background and if there is no space we will just suspend the
185    // scanner. The renew lease operation will be handled in the background.
186    return false;
187  }
188
189  // used in tests to test whether the scanner has been suspended
190  synchronized boolean isSuspended() {
191    return resumer != null;
192  }
193
194  @Override
195  public ScanMetrics getScanMetrics() {
196    return scanMetrics;
197  }
198}