001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.Queue;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.atomic.AtomicLong;
028import java.util.concurrent.locks.Condition;
029import java.util.concurrent.locks.Lock;
030import java.util.concurrent.locks.ReentrantLock;
031import java.util.function.Consumer;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * ClientAsyncPrefetchScanner implements async scanner behaviour.
040 * Specifically, the cache used by this scanner is a concurrent queue which allows both
041 * the producer (hbase client) and consumer (application) to access the queue in parallel.
042 * The number of rows returned in a prefetch is defined by the caching factor and the result size
043 * factor.
044 * This class allocates a buffer cache, whose size is a function of both factors.
045 * The prefetch is invoked when the cache is half­filled, instead of waiting for it to be empty.
046 * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}.
047 */
048@InterfaceAudience.Private
049public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
050
051  private long maxCacheSize;
052  private AtomicLong cacheSizeInBytes;
053  // exception queue (from prefetch to main scan execution)
054  private final Queue<Exception> exceptionsQueue;
055  // used for testing
056  private Consumer<Boolean> prefetchListener;
057
058  private final Lock lock = new ReentrantLock();
059  private final Condition notEmpty = lock.newCondition();
060  private final Condition notFull = lock.newCondition();
061
062  public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
063      ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
064      RpcControllerFactory rpcControllerFactory, ExecutorService pool,
065      int replicaCallTimeoutMicroSecondScan) throws IOException {
066    super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
067        replicaCallTimeoutMicroSecondScan);
068    exceptionsQueue = new ConcurrentLinkedQueue<>();
069    Threads.setDaemonThreadRunning(new Thread(new PrefetchRunnable()), name + ".asyncPrefetcher");
070  }
071
072  void setPrefetchListener(Consumer<Boolean> prefetchListener) {
073    this.prefetchListener = prefetchListener;
074  }
075
076  @Override
077  protected void initCache() {
078    // Override to put a different cache in place of the super's -- a concurrent one.
079    cache = new LinkedBlockingQueue<>();
080    maxCacheSize = resultSize2CacheSize(maxScannerResultSize);
081    cacheSizeInBytes = new AtomicLong(0);
082  }
083
084  private long resultSize2CacheSize(long maxResultSize) {
085    // * 2 if possible
086    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
087  }
088
089  @Override
090  public Result next() throws IOException {
091    try {
092      lock.lock();
093      while (cache.isEmpty()) {
094        handleException();
095        if (this.closed) {
096          return null;
097        }
098        try {
099          notEmpty.await();
100        } catch (InterruptedException e) {
101          throw new InterruptedIOException("Interrupted when wait to load cache");
102        }
103      }
104
105      Result result = pollCache();
106      if (prefetchCondition()) {
107        notFull.signalAll();
108      }
109      return result;
110    } finally {
111      lock.unlock();
112      handleException();
113    }
114  }
115
116  @Override
117  public void close() {
118    try {
119      lock.lock();
120      super.close();
121      closed = true;
122      notFull.signalAll();
123      notEmpty.signalAll();
124    } finally {
125      lock.unlock();
126    }
127  }
128
129  @Override
130  protected void addEstimatedSize(long estimatedSize) {
131    cacheSizeInBytes.addAndGet(estimatedSize);
132  }
133
134  private void handleException() throws IOException {
135    //The prefetch task running in the background puts any exception it
136    //catches into this exception queue.
137    // Rethrow the exception so the application can handle it.
138    while (!exceptionsQueue.isEmpty()) {
139      Exception first = exceptionsQueue.peek();
140      first.printStackTrace();
141      if (first instanceof IOException) {
142        throw (IOException) first;
143      }
144      throw (RuntimeException) first;
145    }
146  }
147
148  private boolean prefetchCondition() {
149    return cacheSizeInBytes.get() < maxCacheSize / 2;
150  }
151
152  private Result pollCache() {
153    Result res = cache.poll();
154    long estimatedSize = calcEstimatedSize(res);
155    addEstimatedSize(-estimatedSize);
156    return res;
157  }
158
159  private class PrefetchRunnable implements Runnable {
160
161    @Override
162    public void run() {
163      while (!closed) {
164        boolean succeed = false;
165        try {
166          lock.lock();
167          while (!prefetchCondition()) {
168            notFull.await();
169          }
170          loadCache();
171          succeed = true;
172        } catch (Exception e) {
173          exceptionsQueue.add(e);
174        } finally {
175          notEmpty.signalAll();
176          lock.unlock();
177          if (prefetchListener != null) {
178            prefetchListener.accept(succeed);
179          }
180        }
181      }
182    }
183
184  }
185
186}