001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 021 022import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.Queue; 027import java.util.concurrent.ConcurrentLinkedQueue; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.LinkedBlockingQueue; 030import java.util.concurrent.atomic.AtomicLong; 031import java.util.concurrent.locks.Condition; 032import java.util.concurrent.locks.Lock; 033import java.util.concurrent.locks.ReentrantLock; 034import java.util.function.Consumer; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 040import org.apache.hadoop.hbase.util.Threads; 041 042/** 043 * ClientAsyncPrefetchScanner implements async scanner behaviour. 044 * Specifically, the cache used by this scanner is a concurrent queue which allows both 045 * the producer (hbase client) and consumer (application) to access the queue in parallel. 046 * The number of rows returned in a prefetch is defined by the caching factor and the result size 047 * factor. 048 * This class allocates a buffer cache, whose size is a function of both factors. 049 * The prefetch is invoked when the cache is halfÂfilled, instead of waiting for it to be empty. 050 * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}. 051 */ 052@InterfaceAudience.Private 053public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { 054 055 private long maxCacheSize; 056 private AtomicLong cacheSizeInBytes; 057 // exception queue (from prefetch to main scan execution) 058 private Queue<Exception> exceptionsQueue; 059 // prefetch thread to be executed asynchronously 060 private Thread prefetcher; 061 // used for testing 062 private Consumer<Boolean> prefetchListener; 063 064 private final Lock lock = new ReentrantLock(); 065 private final Condition notEmpty = lock.newCondition(); 066 private final Condition notFull = lock.newCondition(); 067 068 public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, 069 ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, 070 RpcControllerFactory rpcControllerFactory, ExecutorService pool, 071 int replicaCallTimeoutMicroSecondScan) throws IOException { 072 super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, 073 replicaCallTimeoutMicroSecondScan); 074 } 075 076 @VisibleForTesting 077 void setPrefetchListener(Consumer<Boolean> prefetchListener) { 078 this.prefetchListener = prefetchListener; 079 } 080 081 @Override 082 protected void initCache() { 083 // concurrent cache 084 maxCacheSize = resultSize2CacheSize(maxScannerResultSize); 085 cache = new LinkedBlockingQueue<>(); 086 cacheSizeInBytes = new AtomicLong(0); 087 exceptionsQueue = new ConcurrentLinkedQueue<>(); 088 prefetcher = new Thread(new PrefetchRunnable()); 089 Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher"); 090 } 091 092 private long resultSize2CacheSize(long maxResultSize) { 093 // * 2 if possible 094 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 095 } 096 097 @Override 098 public Result next() throws IOException { 099 try { 100 lock.lock(); 101 while (cache.isEmpty()) { 102 handleException(); 103 if (this.closed) { 104 return null; 105 } 106 try { 107 notEmpty.await(); 108 } catch (InterruptedException e) { 109 throw new InterruptedIOException("Interrupted when wait to load cache"); 110 } 111 } 112 113 Result result = pollCache(); 114 if (prefetchCondition()) { 115 notFull.signalAll(); 116 } 117 return result; 118 } finally { 119 lock.unlock(); 120 handleException(); 121 } 122 } 123 124 @Override 125 public void close() { 126 try { 127 lock.lock(); 128 super.close(); 129 closed = true; 130 notFull.signalAll(); 131 notEmpty.signalAll(); 132 } finally { 133 lock.unlock(); 134 } 135 } 136 137 @Override 138 protected void addEstimatedSize(long estimatedSize) { 139 cacheSizeInBytes.addAndGet(estimatedSize); 140 } 141 142 private void handleException() throws IOException { 143 //The prefetch task running in the background puts any exception it 144 //catches into this exception queue. 145 // Rethrow the exception so the application can handle it. 146 while (!exceptionsQueue.isEmpty()) { 147 Exception first = exceptionsQueue.peek(); 148 first.printStackTrace(); 149 if (first instanceof IOException) { 150 throw (IOException) first; 151 } 152 throw (RuntimeException) first; 153 } 154 } 155 156 private boolean prefetchCondition() { 157 return cacheSizeInBytes.get() < maxCacheSize / 2; 158 } 159 160 private Result pollCache() { 161 Result res = cache.poll(); 162 long estimatedSize = calcEstimatedSize(res); 163 addEstimatedSize(-estimatedSize); 164 return res; 165 } 166 167 private class PrefetchRunnable implements Runnable { 168 169 @Override 170 public void run() { 171 while (!closed) { 172 boolean succeed = false; 173 try { 174 lock.lock(); 175 while (!prefetchCondition()) { 176 notFull.await(); 177 } 178 loadCache(); 179 succeed = true; 180 } catch (Exception e) { 181 exceptionsQueue.add(e); 182 } finally { 183 notEmpty.signalAll(); 184 lock.unlock(); 185 if (prefetchListener != null) { 186 prefetchListener.accept(succeed); 187 } 188 } 189 } 190 } 191 192 } 193 194}