001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayDeque;
025import java.util.Queue;
026
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
031import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
032import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
033
034/**
035 * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
036 * in background and cache it in memory. Typically the {@link #maxCacheSize} will be
037 * {@code 2 * scan.getMaxResultSize()}.
038 */
039@InterfaceAudience.Private
040class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer {
041
042  private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class);
043
044  private final AsyncTable<AdvancedScanResultConsumer> rawTable;
045
046  private final long maxCacheSize;
047
048  private final Scan scan;
049
050  private final Queue<Result> queue = new ArrayDeque<>();
051
052  private ScanMetrics scanMetrics;
053
054  private long cacheSize;
055
056  private boolean closed = false;
057
058  private Throwable error;
059
060  private ScanResumer resumer;
061
062  public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan,
063      long maxCacheSize) {
064    this.rawTable = table;
065    this.maxCacheSize = maxCacheSize;
066    this.scan = scan;
067    table.scan(scan, this);
068  }
069
070  private void addToCache(Result result) {
071    queue.add(result);
072    cacheSize += calcEstimatedSize(result);
073  }
074
075  private void stopPrefetch(ScanController controller) {
076    if (LOG.isDebugEnabled()) {
077      LOG.debug(String.format("0x%x", System.identityHashCode(this)) +
078        " stop prefetching when scanning " + rawTable.getName() + " as the cache size " +
079        cacheSize + " is greater than the maxCacheSize " + maxCacheSize);
080    }
081    resumer = controller.suspend();
082  }
083
084  @Override
085  public synchronized void onNext(Result[] results, ScanController controller) {
086    assert results.length > 0;
087    if (closed) {
088      controller.terminate();
089      return;
090    }
091    for (Result result : results) {
092      addToCache(result);
093    }
094    notifyAll();
095    if (cacheSize >= maxCacheSize) {
096      stopPrefetch(controller);
097    }
098  }
099
100  @Override
101  public synchronized void onHeartbeat(ScanController controller) {
102    if (closed) {
103      controller.terminate();
104      return;
105    }
106    if (scan.isNeedCursorResult()) {
107      controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
108    }
109  }
110
111  @Override
112  public synchronized void onError(Throwable error) {
113    this.error = error;
114    notifyAll();
115  }
116
117  @Override
118  public synchronized void onComplete() {
119    closed = true;
120    notifyAll();
121  }
122
123  @Override
124  public void onScanMetricsCreated(ScanMetrics scanMetrics) {
125    this.scanMetrics = scanMetrics;
126  }
127
128  private void resumePrefetch() {
129    if (LOG.isDebugEnabled()) {
130      LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching");
131    }
132    resumer.resume();
133    resumer = null;
134  }
135
136  @Override
137  public synchronized Result next() throws IOException {
138    while (queue.isEmpty()) {
139      if (closed) {
140        return null;
141      }
142      if (error != null) {
143        Throwables.propagateIfPossible(error, IOException.class);
144        throw new IOException(error);
145      }
146      try {
147        wait();
148      } catch (InterruptedException e) {
149        throw new InterruptedIOException();
150      }
151    }
152    Result result = queue.poll();
153    if (!result.isCursor()) {
154      cacheSize -= calcEstimatedSize(result);
155      if (resumer != null && cacheSize <= maxCacheSize / 2) {
156        resumePrefetch();
157      }
158    }
159    return result;
160  }
161
162  @Override
163  public synchronized void close() {
164    closed = true;
165    queue.clear();
166    cacheSize = 0;
167    if (resumer != null) {
168      resumePrefetch();
169    }
170    notifyAll();
171  }
172
173  @Override
174  public boolean renewLease() {
175    // we will do prefetching in the background and if there is no space we will just suspend the
176    // scanner. The renew lease operation will be handled in the background.
177    return false;
178  }
179
180  // used in tests to test whether the scanner has been suspended
181  @VisibleForTesting
182  synchronized boolean isSuspended() {
183    return resumer != null;
184  }
185
186  @Override
187  public ScanMetrics getScanMetrics() {
188    return scanMetrics;
189  }
190}