001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.Queue;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.atomic.AtomicLong;
028import java.util.concurrent.locks.Condition;
029import java.util.concurrent.locks.Lock;
030import java.util.concurrent.locks.ReentrantLock;
031import java.util.function.Consumer;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
038
039/**
040 * ClientAsyncPrefetchScanner implements async scanner behaviour.
041 * Specifically, the cache used by this scanner is a concurrent queue which allows both
042 * the producer (hbase client) and consumer (application) to access the queue in parallel.
043 * The number of rows returned in a prefetch is defined by the caching factor and the result size
044 * factor.
045 * This class allocates a buffer cache, whose size is a function of both factors.
046 * The prefetch is invoked when the cache is half­filled, instead of waiting for it to be empty.
047 * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}.
048 */
049@InterfaceAudience.Private
050public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
051
052  private long maxCacheSize;
053  private AtomicLong cacheSizeInBytes;
054  // exception queue (from prefetch to main scan execution)
055  private final Queue<Exception> exceptionsQueue;
056  // used for testing
057  private Consumer<Boolean> prefetchListener;
058
059  private final Lock lock = new ReentrantLock();
060  private final Condition notEmpty = lock.newCondition();
061  private final Condition notFull = lock.newCondition();
062
063  public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
064      ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
065      RpcControllerFactory rpcControllerFactory, ExecutorService pool,
066      int replicaCallTimeoutMicroSecondScan) throws IOException {
067    super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
068        replicaCallTimeoutMicroSecondScan);
069    exceptionsQueue = new ConcurrentLinkedQueue<>();
070    Threads.setDaemonThreadRunning(new Thread(new PrefetchRunnable()), name + ".asyncPrefetcher");
071  }
072
073  @VisibleForTesting
074  void setPrefetchListener(Consumer<Boolean> prefetchListener) {
075    this.prefetchListener = prefetchListener;
076  }
077
078  @Override
079  protected void initCache() {
080    // Override to put a different cache in place of the super's -- a concurrent one.
081    cache = new LinkedBlockingQueue<>();
082    maxCacheSize = resultSize2CacheSize(maxScannerResultSize);
083    cacheSizeInBytes = new AtomicLong(0);
084  }
085
086  private long resultSize2CacheSize(long maxResultSize) {
087    // * 2 if possible
088    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
089  }
090
091  @Override
092  public Result next() throws IOException {
093    try {
094      lock.lock();
095      while (cache.isEmpty()) {
096        handleException();
097        if (this.closed) {
098          return null;
099        }
100        try {
101          notEmpty.await();
102        } catch (InterruptedException e) {
103          throw new InterruptedIOException("Interrupted when wait to load cache");
104        }
105      }
106
107      Result result = pollCache();
108      if (prefetchCondition()) {
109        notFull.signalAll();
110      }
111      return result;
112    } finally {
113      lock.unlock();
114      handleException();
115    }
116  }
117
118  @Override
119  public void close() {
120    try {
121      lock.lock();
122      super.close();
123      closed = true;
124      notFull.signalAll();
125      notEmpty.signalAll();
126    } finally {
127      lock.unlock();
128    }
129  }
130
131  @Override
132  protected void addEstimatedSize(long estimatedSize) {
133    cacheSizeInBytes.addAndGet(estimatedSize);
134  }
135
136  private void handleException() throws IOException {
137    //The prefetch task running in the background puts any exception it
138    //catches into this exception queue.
139    // Rethrow the exception so the application can handle it.
140    while (!exceptionsQueue.isEmpty()) {
141      Exception first = exceptionsQueue.peek();
142      first.printStackTrace();
143      if (first instanceof IOException) {
144        throw (IOException) first;
145      }
146      throw (RuntimeException) first;
147    }
148  }
149
150  private boolean prefetchCondition() {
151    return cacheSizeInBytes.get() < maxCacheSize / 2;
152  }
153
154  private Result pollCache() {
155    Result res = cache.poll();
156    long estimatedSize = calcEstimatedSize(res);
157    addEstimatedSize(-estimatedSize);
158    return res;
159  }
160
161  private class PrefetchRunnable implements Runnable {
162
163    @Override
164    public void run() {
165      while (!closed) {
166        boolean succeed = false;
167        try {
168          lock.lock();
169          while (!prefetchCondition()) {
170            notFull.await();
171          }
172          loadCache();
173          succeed = true;
174        } catch (Exception e) {
175          exceptionsQueue.add(e);
176        } finally {
177          notEmpty.signalAll();
178          lock.unlock();
179          if (prefetchListener != null) {
180            prefetchListener.accept(succeed);
181          }
182        }
183      }
184    }
185
186  }
187
188}