001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.Queue; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.atomic.AtomicLong; 028import java.util.concurrent.locks.Condition; 029import java.util.concurrent.locks.Lock; 030import java.util.concurrent.locks.ReentrantLock; 031import java.util.function.Consumer; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 038 039/** 040 * ClientAsyncPrefetchScanner implements async scanner behaviour. 041 * Specifically, the cache used by this scanner is a concurrent queue which allows both 042 * the producer (hbase client) and consumer (application) to access the queue in parallel. 043 * The number of rows returned in a prefetch is defined by the caching factor and the result size 044 * factor. 045 * This class allocates a buffer cache, whose size is a function of both factors. 046 * The prefetch is invoked when the cache is halfÂfilled, instead of waiting for it to be empty. 047 * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}. 048 */ 049@InterfaceAudience.Private 050public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { 051 052 private long maxCacheSize; 053 private AtomicLong cacheSizeInBytes; 054 // exception queue (from prefetch to main scan execution) 055 private final Queue<Exception> exceptionsQueue; 056 // used for testing 057 private Consumer<Boolean> prefetchListener; 058 059 private final Lock lock = new ReentrantLock(); 060 private final Condition notEmpty = lock.newCondition(); 061 private final Condition notFull = lock.newCondition(); 062 063 public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, 064 ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, 065 RpcControllerFactory rpcControllerFactory, ExecutorService pool, 066 int replicaCallTimeoutMicroSecondScan) throws IOException { 067 super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, 068 replicaCallTimeoutMicroSecondScan); 069 exceptionsQueue = new ConcurrentLinkedQueue<>(); 070 Threads.setDaemonThreadRunning(new Thread(new PrefetchRunnable()), name + ".asyncPrefetcher"); 071 } 072 073 @VisibleForTesting 074 void setPrefetchListener(Consumer<Boolean> prefetchListener) { 075 this.prefetchListener = prefetchListener; 076 } 077 078 @Override 079 protected void initCache() { 080 // Override to put a different cache in place of the super's -- a concurrent one. 081 cache = new LinkedBlockingQueue<>(); 082 maxCacheSize = resultSize2CacheSize(maxScannerResultSize); 083 cacheSizeInBytes = new AtomicLong(0); 084 } 085 086 private long resultSize2CacheSize(long maxResultSize) { 087 // * 2 if possible 088 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 089 } 090 091 @Override 092 public Result next() throws IOException { 093 try { 094 lock.lock(); 095 while (cache.isEmpty()) { 096 handleException(); 097 if (this.closed) { 098 return null; 099 } 100 try { 101 notEmpty.await(); 102 } catch (InterruptedException e) { 103 throw new InterruptedIOException("Interrupted when wait to load cache"); 104 } 105 } 106 107 Result result = pollCache(); 108 if (prefetchCondition()) { 109 notFull.signalAll(); 110 } 111 return result; 112 } finally { 113 lock.unlock(); 114 handleException(); 115 } 116 } 117 118 @Override 119 public void close() { 120 try { 121 lock.lock(); 122 super.close(); 123 closed = true; 124 notFull.signalAll(); 125 notEmpty.signalAll(); 126 } finally { 127 lock.unlock(); 128 } 129 } 130 131 @Override 132 protected void addEstimatedSize(long estimatedSize) { 133 cacheSizeInBytes.addAndGet(estimatedSize); 134 } 135 136 private void handleException() throws IOException { 137 //The prefetch task running in the background puts any exception it 138 //catches into this exception queue. 139 // Rethrow the exception so the application can handle it. 140 while (!exceptionsQueue.isEmpty()) { 141 Exception first = exceptionsQueue.peek(); 142 first.printStackTrace(); 143 if (first instanceof IOException) { 144 throw (IOException) first; 145 } 146 throw (RuntimeException) first; 147 } 148 } 149 150 private boolean prefetchCondition() { 151 return cacheSizeInBytes.get() < maxCacheSize / 2; 152 } 153 154 private Result pollCache() { 155 Result res = cache.poll(); 156 long estimatedSize = calcEstimatedSize(res); 157 addEstimatedSize(-estimatedSize); 158 return res; 159 } 160 161 private class PrefetchRunnable implements Runnable { 162 163 @Override 164 public void run() { 165 while (!closed) { 166 boolean succeed = false; 167 try { 168 lock.lock(); 169 while (!prefetchCondition()) { 170 notFull.await(); 171 } 172 loadCache(); 173 succeed = true; 174 } catch (Exception e) { 175 exceptionsQueue.add(e); 176 } finally { 177 notEmpty.signalAll(); 178 lock.unlock(); 179 if (prefetchListener != null) { 180 prefetchListener.accept(succeed); 181 } 182 } 183 } 184 } 185 186 } 187 188}