001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.Queue; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.atomic.AtomicLong; 028import java.util.concurrent.locks.Condition; 029import java.util.concurrent.locks.Lock; 030import java.util.concurrent.locks.ReentrantLock; 031import java.util.function.Consumer; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.yetus.audience.InterfaceAudience; 037 038/** 039 * ClientAsyncPrefetchScanner implements async scanner behaviour. 040 * Specifically, the cache used by this scanner is a concurrent queue which allows both 041 * the producer (hbase client) and consumer (application) to access the queue in parallel. 042 * The number of rows returned in a prefetch is defined by the caching factor and the result size 043 * factor. 044 * This class allocates a buffer cache, whose size is a function of both factors. 045 * The prefetch is invoked when the cache is halfÂfilled, instead of waiting for it to be empty. 046 * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}. 047 */ 048@InterfaceAudience.Private 049public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { 050 051 private long maxCacheSize; 052 private AtomicLong cacheSizeInBytes; 053 // exception queue (from prefetch to main scan execution) 054 private final Queue<Exception> exceptionsQueue; 055 // used for testing 056 private Consumer<Boolean> prefetchListener; 057 058 private final Lock lock = new ReentrantLock(); 059 private final Condition notEmpty = lock.newCondition(); 060 private final Condition notFull = lock.newCondition(); 061 062 public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, 063 ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, 064 RpcControllerFactory rpcControllerFactory, ExecutorService pool, 065 int replicaCallTimeoutMicroSecondScan) throws IOException { 066 super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, 067 replicaCallTimeoutMicroSecondScan); 068 exceptionsQueue = new ConcurrentLinkedQueue<>(); 069 Threads.setDaemonThreadRunning(new Thread(new PrefetchRunnable()), name + ".asyncPrefetcher"); 070 } 071 072 void setPrefetchListener(Consumer<Boolean> prefetchListener) { 073 this.prefetchListener = prefetchListener; 074 } 075 076 @Override 077 protected void initCache() { 078 // Override to put a different cache in place of the super's -- a concurrent one. 079 cache = new LinkedBlockingQueue<>(); 080 maxCacheSize = resultSize2CacheSize(maxScannerResultSize); 081 cacheSizeInBytes = new AtomicLong(0); 082 } 083 084 private long resultSize2CacheSize(long maxResultSize) { 085 // * 2 if possible 086 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 087 } 088 089 @Override 090 public Result next() throws IOException { 091 try { 092 lock.lock(); 093 while (cache.isEmpty()) { 094 handleException(); 095 if (this.closed) { 096 return null; 097 } 098 try { 099 notEmpty.await(); 100 } catch (InterruptedException e) { 101 throw new InterruptedIOException("Interrupted when wait to load cache"); 102 } 103 } 104 105 Result result = pollCache(); 106 if (prefetchCondition()) { 107 notFull.signalAll(); 108 } 109 return result; 110 } finally { 111 lock.unlock(); 112 handleException(); 113 } 114 } 115 116 @Override 117 public void close() { 118 try { 119 lock.lock(); 120 super.close(); 121 closed = true; 122 notFull.signalAll(); 123 notEmpty.signalAll(); 124 } finally { 125 lock.unlock(); 126 } 127 } 128 129 @Override 130 protected void addEstimatedSize(long estimatedSize) { 131 cacheSizeInBytes.addAndGet(estimatedSize); 132 } 133 134 private void handleException() throws IOException { 135 //The prefetch task running in the background puts any exception it 136 //catches into this exception queue. 137 // Rethrow the exception so the application can handle it. 138 while (!exceptionsQueue.isEmpty()) { 139 Exception first = exceptionsQueue.peek(); 140 first.printStackTrace(); 141 if (first instanceof IOException) { 142 throw (IOException) first; 143 } 144 throw (RuntimeException) first; 145 } 146 } 147 148 private boolean prefetchCondition() { 149 return cacheSizeInBytes.get() < maxCacheSize / 2; 150 } 151 152 private Result pollCache() { 153 Result res = cache.poll(); 154 long estimatedSize = calcEstimatedSize(res); 155 addEstimatedSize(-estimatedSize); 156 return res; 157 } 158 159 private class PrefetchRunnable implements Runnable { 160 161 @Override 162 public void run() { 163 while (!closed) { 164 boolean succeed = false; 165 try { 166 lock.lock(); 167 while (!prefetchCondition()) { 168 notFull.await(); 169 } 170 loadCache(); 171 succeed = true; 172 } catch (Exception e) { 173 exceptionsQueue.add(e); 174 } finally { 175 notEmpty.signalAll(); 176 lock.unlock(); 177 if (prefetchListener != null) { 178 prefetchListener.accept(succeed); 179 } 180 } 181 } 182 } 183 184 } 185 186}