001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
021
022import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.Queue;
027import java.util.concurrent.ConcurrentLinkedQueue;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.atomic.AtomicLong;
031import java.util.concurrent.locks.Condition;
032import java.util.concurrent.locks.Lock;
033import java.util.concurrent.locks.ReentrantLock;
034import java.util.function.Consumer;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
040import org.apache.hadoop.hbase.util.Threads;
041
042/**
043 * ClientAsyncPrefetchScanner implements async scanner behaviour.
044 * Specifically, the cache used by this scanner is a concurrent queue which allows both
045 * the producer (hbase client) and consumer (application) to access the queue in parallel.
046 * The number of rows returned in a prefetch is defined by the caching factor and the result size
047 * factor.
048 * This class allocates a buffer cache, whose size is a function of both factors.
049 * The prefetch is invoked when the cache is half­filled, instead of waiting for it to be empty.
050 * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}.
051 */
052@InterfaceAudience.Private
053public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
054
055  private long maxCacheSize;
056  private AtomicLong cacheSizeInBytes;
057  // exception queue (from prefetch to main scan execution)
058  private Queue<Exception> exceptionsQueue;
059  // prefetch thread to be executed asynchronously
060  private Thread prefetcher;
061  // used for testing
062  private Consumer<Boolean> prefetchListener;
063
064  private final Lock lock = new ReentrantLock();
065  private final Condition notEmpty = lock.newCondition();
066  private final Condition notFull = lock.newCondition();
067
068  public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
069      ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
070      RpcControllerFactory rpcControllerFactory, ExecutorService pool,
071      int replicaCallTimeoutMicroSecondScan) throws IOException {
072    super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
073        replicaCallTimeoutMicroSecondScan);
074  }
075
076  @VisibleForTesting
077  void setPrefetchListener(Consumer<Boolean> prefetchListener) {
078    this.prefetchListener = prefetchListener;
079  }
080
081  @Override
082  protected void initCache() {
083    // concurrent cache
084    maxCacheSize = resultSize2CacheSize(maxScannerResultSize);
085    cache = new LinkedBlockingQueue<>();
086    cacheSizeInBytes = new AtomicLong(0);
087    exceptionsQueue = new ConcurrentLinkedQueue<>();
088    prefetcher = new Thread(new PrefetchRunnable());
089    Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher");
090  }
091
092  private long resultSize2CacheSize(long maxResultSize) {
093    // * 2 if possible
094    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
095  }
096
097  @Override
098  public Result next() throws IOException {
099    try {
100      lock.lock();
101      while (cache.isEmpty()) {
102        handleException();
103        if (this.closed) {
104          return null;
105        }
106        try {
107          notEmpty.await();
108        } catch (InterruptedException e) {
109          throw new InterruptedIOException("Interrupted when wait to load cache");
110        }
111      }
112
113      Result result = pollCache();
114      if (prefetchCondition()) {
115        notFull.signalAll();
116      }
117      return result;
118    } finally {
119      lock.unlock();
120      handleException();
121    }
122  }
123
124  @Override
125  public void close() {
126    try {
127      lock.lock();
128      super.close();
129      closed = true;
130      notFull.signalAll();
131      notEmpty.signalAll();
132    } finally {
133      lock.unlock();
134    }
135  }
136
137  @Override
138  protected void addEstimatedSize(long estimatedSize) {
139    cacheSizeInBytes.addAndGet(estimatedSize);
140  }
141
142  private void handleException() throws IOException {
143    //The prefetch task running in the background puts any exception it
144    //catches into this exception queue.
145    // Rethrow the exception so the application can handle it.
146    while (!exceptionsQueue.isEmpty()) {
147      Exception first = exceptionsQueue.peek();
148      first.printStackTrace();
149      if (first instanceof IOException) {
150        throw (IOException) first;
151      }
152      throw (RuntimeException) first;
153    }
154  }
155
156  private boolean prefetchCondition() {
157    return cacheSizeInBytes.get() < maxCacheSize / 2;
158  }
159
160  private Result pollCache() {
161    Result res = cache.poll();
162    long estimatedSize = calcEstimatedSize(res);
163    addEstimatedSize(-estimatedSize);
164    return res;
165  }
166
167  private class PrefetchRunnable implements Runnable {
168
169    @Override
170    public void run() {
171      while (!closed) {
172        boolean succeed = false;
173        try {
174          lock.lock();
175          while (!prefetchCondition()) {
176            notFull.await();
177          }
178          loadCache();
179          succeed = true;
180        } catch (Exception e) {
181          exceptionsQueue.add(e);
182        } finally {
183          notEmpty.signalAll();
184          lock.unlock();
185          if (prefetchListener != null) {
186            prefetchListener.accept(succeed);
187          }
188        }
189      }
190    }
191
192  }
193
194}