001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.ArrayDeque; 025import java.util.Queue; 026 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 031import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 032import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 033 034/** 035 * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically 036 * in background and cache it in memory. Typically the {@link #maxCacheSize} will be 037 * {@code 2 * scan.getMaxResultSize()}. 038 */ 039@InterfaceAudience.Private 040class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer { 041 042 private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class); 043 044 private final AsyncTable<AdvancedScanResultConsumer> rawTable; 045 046 private final long maxCacheSize; 047 048 private final Scan scan; 049 050 private final Queue<Result> queue = new ArrayDeque<>(); 051 052 private ScanMetrics scanMetrics; 053 054 private long cacheSize; 055 056 private boolean closed = false; 057 058 private Throwable error; 059 060 private ScanResumer resumer; 061 062 public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan, 063 long maxCacheSize) { 064 this.rawTable = table; 065 this.maxCacheSize = maxCacheSize; 066 this.scan = scan; 067 table.scan(scan, this); 068 } 069 070 private void addToCache(Result result) { 071 queue.add(result); 072 cacheSize += calcEstimatedSize(result); 073 } 074 075 private void stopPrefetch(ScanController controller) { 076 if (LOG.isDebugEnabled()) { 077 LOG.debug(String.format("0x%x", System.identityHashCode(this)) + 078 " stop prefetching when scanning " + rawTable.getName() + " as the cache size " + 079 cacheSize + " is greater than the maxCacheSize " + maxCacheSize); 080 } 081 resumer = controller.suspend(); 082 } 083 084 @Override 085 public synchronized void onNext(Result[] results, ScanController controller) { 086 assert results.length > 0; 087 if (closed) { 088 controller.terminate(); 089 return; 090 } 091 for (Result result : results) { 092 addToCache(result); 093 } 094 notifyAll(); 095 if (cacheSize >= maxCacheSize) { 096 stopPrefetch(controller); 097 } 098 } 099 100 @Override 101 public synchronized void onHeartbeat(ScanController controller) { 102 if (closed) { 103 controller.terminate(); 104 return; 105 } 106 if (scan.isNeedCursorResult()) { 107 controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c))); 108 } 109 } 110 111 @Override 112 public synchronized void onError(Throwable error) { 113 this.error = error; 114 notifyAll(); 115 } 116 117 @Override 118 public synchronized void onComplete() { 119 closed = true; 120 notifyAll(); 121 } 122 123 @Override 124 public void onScanMetricsCreated(ScanMetrics scanMetrics) { 125 this.scanMetrics = scanMetrics; 126 } 127 128 private void resumePrefetch() { 129 if (LOG.isDebugEnabled()) { 130 LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching"); 131 } 132 resumer.resume(); 133 resumer = null; 134 } 135 136 @Override 137 public synchronized Result next() throws IOException { 138 while (queue.isEmpty()) { 139 if (closed) { 140 return null; 141 } 142 if (error != null) { 143 Throwables.propagateIfPossible(error, IOException.class); 144 throw new IOException(error); 145 } 146 try { 147 wait(); 148 } catch (InterruptedException e) { 149 throw new InterruptedIOException(); 150 } 151 } 152 Result result = queue.poll(); 153 if (!result.isCursor()) { 154 cacheSize -= calcEstimatedSize(result); 155 if (resumer != null && cacheSize <= maxCacheSize / 2) { 156 resumePrefetch(); 157 } 158 } 159 return result; 160 } 161 162 @Override 163 public synchronized void close() { 164 closed = true; 165 queue.clear(); 166 cacheSize = 0; 167 if (resumer != null) { 168 resumePrefetch(); 169 } 170 notifyAll(); 171 } 172 173 @Override 174 public boolean renewLease() { 175 // we will do prefetching in the background and if there is no space we will just suspend the 176 // scanner. The renew lease operation will be handled in the background. 177 return false; 178 } 179 180 // used in tests to test whether the scanner has been suspended 181 @VisibleForTesting 182 synchronized boolean isSuspended() { 183 return resumer != null; 184 } 185 186 @Override 187 public ScanMetrics getScanMetrics() { 188 return scanMetrics; 189 } 190}