001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
021
022import io.opentelemetry.context.Context;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.Queue;
027import java.util.concurrent.ConcurrentLinkedQueue;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.atomic.AtomicLong;
031import java.util.concurrent.locks.Condition;
032import java.util.concurrent.locks.Lock;
033import java.util.concurrent.locks.ReentrantLock;
034import java.util.function.Consumer;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.yetus.audience.InterfaceAudience;
040
041/**
042 * ClientAsyncPrefetchScanner implements async scanner behaviour. Specifically, the cache used by
043 * this scanner is a concurrent queue which allows both the producer (hbase client) and consumer
044 * (application) to access the queue in parallel. The number of rows returned in a prefetch is
045 * defined by the caching factor and the result size factor. This class allocates a buffer cache,
046 * whose size is a function of both factors. The prefetch is invoked when the cache is half-filled,
047 * instead of waiting for it to be empty. This is defined in the method
048 * {@link ClientAsyncPrefetchScanner#prefetchCondition()}.
049 */
050@InterfaceAudience.Private
051public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
052
053  private long maxCacheSize;
054  private AtomicLong cacheSizeInBytes;
055  // exception queue (from prefetch to main scan execution)
056  private final Queue<Exception> exceptionsQueue;
057  // used for testing
058  private Consumer<Boolean> prefetchListener;
059
060  private final Lock lock = new ReentrantLock();
061  private final Condition notEmpty = lock.newCondition();
062  private final Condition notFull = lock.newCondition();
063
064  public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
065    ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
066    RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
067    int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException {
068    super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
069      scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan);
070    exceptionsQueue = new ConcurrentLinkedQueue<>();
071    final Context context = Context.current();
072    final Runnable runnable = context.wrap(new PrefetchRunnable());
073    Threads.setDaemonThreadRunning(new Thread(runnable), name + ".asyncPrefetcher");
074  }
075
076  void setPrefetchListener(Consumer<Boolean> prefetchListener) {
077    this.prefetchListener = prefetchListener;
078  }
079
080  @Override
081  protected void initCache() {
082    // Override to put a different cache in place of the super's -- a concurrent one.
083    cache = new LinkedBlockingQueue<>();
084    maxCacheSize = resultSize2CacheSize(maxScannerResultSize);
085    cacheSizeInBytes = new AtomicLong(0);
086  }
087
088  private long resultSize2CacheSize(long maxResultSize) {
089    // * 2 if possible
090    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
091  }
092
093  @Override
094  public Result next() throws IOException {
095    try (Scope ignored = span.makeCurrent()) {
096      lock.lock();
097      try {
098        while (cache.isEmpty()) {
099          handleException();
100          if (this.closed) {
101            return null;
102          }
103          try {
104            notEmpty.await();
105          } catch (InterruptedException e) {
106            span.recordException(e);
107            throw new InterruptedIOException("Interrupted when wait to load cache");
108          }
109        }
110        Result result = pollCache();
111        if (prefetchCondition()) {
112          notFull.signalAll();
113        }
114        return result;
115      } finally {
116        lock.unlock();
117        handleException();
118      }
119    }
120  }
121
122  @Override
123  public void close() {
124    lock.lock();
125    try {
126      super.close();
127      closed = true;
128      notFull.signalAll();
129      notEmpty.signalAll();
130    } finally {
131      lock.unlock();
132    }
133  }
134
135  @Override
136  protected void addEstimatedSize(long estimatedSize) {
137    cacheSizeInBytes.addAndGet(estimatedSize);
138  }
139
140  private void handleException() throws IOException {
141    // The prefetch task running in the background puts any exception it
142    // catches into this exception queue.
143    // Rethrow the exception so the application can handle it.
144    while (!exceptionsQueue.isEmpty()) {
145      Exception first = exceptionsQueue.peek();
146      first.printStackTrace();
147      if (first instanceof IOException) {
148        throw (IOException) first;
149      }
150      throw (RuntimeException) first;
151    }
152  }
153
154  private boolean prefetchCondition() {
155    return cacheSizeInBytes.get() < maxCacheSize / 2;
156  }
157
158  private Result pollCache() {
159    Result res = cache.poll();
160    long estimatedSize = calcEstimatedSize(res);
161    addEstimatedSize(-estimatedSize);
162    return res;
163  }
164
165  private class PrefetchRunnable implements Runnable {
166
167    @Override
168    public void run() {
169      while (!closed) {
170        boolean succeed = false;
171        try {
172          lock.lock();
173          while (!prefetchCondition()) {
174            notFull.await();
175          }
176          loadCache();
177          succeed = true;
178        } catch (Exception e) {
179          exceptionsQueue.add(e);
180          span.recordException(e);
181        } finally {
182          notEmpty.signalAll();
183          lock.unlock();
184          if (prefetchListener != null) {
185            prefetchListener.accept(succeed);
186          }
187        }
188      }
189    }
190  }
191
192}