001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021 022import io.opentelemetry.context.Context; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.Queue; 027import java.util.concurrent.ConcurrentLinkedQueue; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.LinkedBlockingQueue; 030import java.util.concurrent.atomic.AtomicLong; 031import java.util.concurrent.locks.Condition; 032import java.util.concurrent.locks.Lock; 033import java.util.concurrent.locks.ReentrantLock; 034import java.util.function.Consumer; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.yetus.audience.InterfaceAudience; 040 041/** 042 * ClientAsyncPrefetchScanner implements async scanner behaviour. Specifically, the cache used by 043 * this scanner is a concurrent queue which allows both the producer (hbase client) and consumer 044 * (application) to access the queue in parallel. The number of rows returned in a prefetch is 045 * defined by the caching factor and the result size factor. This class allocates a buffer cache, 046 * whose size is a function of both factors. The prefetch is invoked when the cache is half-filled, 047 * instead of waiting for it to be empty. This is defined in the method 048 * {@link ClientAsyncPrefetchScanner#prefetchCondition()}. 049 */ 050@InterfaceAudience.Private 051public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { 052 053 private long maxCacheSize; 054 private AtomicLong cacheSizeInBytes; 055 // exception queue (from prefetch to main scan execution) 056 private final Queue<Exception> exceptionsQueue; 057 // used for testing 058 private Consumer<Boolean> prefetchListener; 059 060 private final Lock lock = new ReentrantLock(); 061 private final Condition notEmpty = lock.newCondition(); 062 private final Condition notFull = lock.newCondition(); 063 064 public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, 065 ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, 066 RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, 067 int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException { 068 super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, 069 scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan); 070 exceptionsQueue = new ConcurrentLinkedQueue<>(); 071 final Context context = Context.current(); 072 final Runnable runnable = context.wrap(new PrefetchRunnable()); 073 Threads.setDaemonThreadRunning(new Thread(runnable), name + ".asyncPrefetcher"); 074 } 075 076 void setPrefetchListener(Consumer<Boolean> prefetchListener) { 077 this.prefetchListener = prefetchListener; 078 } 079 080 @Override 081 protected void initCache() { 082 // Override to put a different cache in place of the super's -- a concurrent one. 083 cache = new LinkedBlockingQueue<>(); 084 maxCacheSize = resultSize2CacheSize(maxScannerResultSize); 085 cacheSizeInBytes = new AtomicLong(0); 086 } 087 088 private long resultSize2CacheSize(long maxResultSize) { 089 // * 2 if possible 090 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 091 } 092 093 @Override 094 public Result next() throws IOException { 095 try (Scope ignored = span.makeCurrent()) { 096 lock.lock(); 097 try { 098 while (cache.isEmpty()) { 099 handleException(); 100 if (this.closed) { 101 return null; 102 } 103 try { 104 notEmpty.await(); 105 } catch (InterruptedException e) { 106 span.recordException(e); 107 throw new InterruptedIOException("Interrupted when wait to load cache"); 108 } 109 } 110 Result result = pollCache(); 111 if (prefetchCondition()) { 112 notFull.signalAll(); 113 } 114 return result; 115 } finally { 116 lock.unlock(); 117 handleException(); 118 } 119 } 120 } 121 122 @Override 123 public void close() { 124 lock.lock(); 125 try { 126 super.close(); 127 closed = true; 128 notFull.signalAll(); 129 notEmpty.signalAll(); 130 } finally { 131 lock.unlock(); 132 } 133 } 134 135 @Override 136 protected void addEstimatedSize(long estimatedSize) { 137 cacheSizeInBytes.addAndGet(estimatedSize); 138 } 139 140 private void handleException() throws IOException { 141 // The prefetch task running in the background puts any exception it 142 // catches into this exception queue. 143 // Rethrow the exception so the application can handle it. 144 while (!exceptionsQueue.isEmpty()) { 145 Exception first = exceptionsQueue.peek(); 146 first.printStackTrace(); 147 if (first instanceof IOException) { 148 throw (IOException) first; 149 } 150 throw (RuntimeException) first; 151 } 152 } 153 154 private boolean prefetchCondition() { 155 return cacheSizeInBytes.get() < maxCacheSize / 2; 156 } 157 158 private Result pollCache() { 159 Result res = cache.poll(); 160 long estimatedSize = calcEstimatedSize(res); 161 addEstimatedSize(-estimatedSize); 162 return res; 163 } 164 165 private class PrefetchRunnable implements Runnable { 166 167 @Override 168 public void run() { 169 while (!closed) { 170 boolean succeed = false; 171 try { 172 lock.lock(); 173 while (!prefetchCondition()) { 174 notFull.await(); 175 } 176 loadCache(); 177 succeed = true; 178 } catch (Exception e) { 179 exceptionsQueue.add(e); 180 span.recordException(e); 181 } finally { 182 notEmpty.signalAll(); 183 lock.unlock(); 184 if (prefetchListener != null) { 185 prefetchListener.accept(succeed); 186 } 187 } 188 } 189 } 190 } 191 192}