001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.ArrayDeque; 025import java.util.Queue; 026import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 027import org.apache.hadoop.hbase.util.FutureUtils; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically 034 * in background and cache it in memory. Typically the {@link #maxCacheSize} will be 035 * {@code 2 * scan.getMaxResultSize()}. 036 */ 037@InterfaceAudience.Private 038class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer { 039 040 private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class); 041 042 private final AsyncTable<AdvancedScanResultConsumer> rawTable; 043 044 private final long maxCacheSize; 045 046 private final Scan scan; 047 048 private final Queue<Result> queue = new ArrayDeque<>(); 049 050 private ScanMetrics scanMetrics; 051 052 private long cacheSize; 053 054 private boolean closed = false; 055 056 private Throwable error; 057 058 private ScanResumer resumer; 059 060 public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan, 061 long maxCacheSize) { 062 this.rawTable = table; 063 this.maxCacheSize = maxCacheSize; 064 this.scan = scan; 065 table.scan(scan, this); 066 } 067 068 private void addToCache(Result result) { 069 queue.add(result); 070 cacheSize += calcEstimatedSize(result); 071 } 072 073 private void stopPrefetch(ScanController controller) { 074 if (LOG.isDebugEnabled()) { 075 LOG.debug(String.format("0x%x", System.identityHashCode(this)) + 076 " stop prefetching when scanning " + rawTable.getName() + " as the cache size " + 077 cacheSize + " is greater than the maxCacheSize " + maxCacheSize); 078 } 079 resumer = controller.suspend(); 080 } 081 082 @Override 083 public synchronized void onNext(Result[] results, ScanController controller) { 084 assert results.length > 0; 085 if (closed) { 086 controller.terminate(); 087 return; 088 } 089 for (Result result : results) { 090 addToCache(result); 091 } 092 notifyAll(); 093 if (cacheSize >= maxCacheSize) { 094 stopPrefetch(controller); 095 } 096 } 097 098 @Override 099 public synchronized void onHeartbeat(ScanController controller) { 100 if (closed) { 101 controller.terminate(); 102 return; 103 } 104 if (scan.isNeedCursorResult()) { 105 controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c))); 106 } 107 } 108 109 @Override 110 public synchronized void onError(Throwable error) { 111 this.error = error; 112 notifyAll(); 113 } 114 115 @Override 116 public synchronized void onComplete() { 117 closed = true; 118 notifyAll(); 119 } 120 121 @Override 122 public void onScanMetricsCreated(ScanMetrics scanMetrics) { 123 this.scanMetrics = scanMetrics; 124 } 125 126 private void resumePrefetch() { 127 if (LOG.isDebugEnabled()) { 128 LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching"); 129 } 130 resumer.resume(); 131 resumer = null; 132 } 133 134 @Override 135 public synchronized Result next() throws IOException { 136 while (queue.isEmpty()) { 137 if (closed) { 138 return null; 139 } 140 if (error != null) { 141 FutureUtils.rethrow(error); 142 } 143 try { 144 wait(); 145 } catch (InterruptedException e) { 146 throw new InterruptedIOException(); 147 } 148 } 149 Result result = queue.poll(); 150 if (!result.isCursor()) { 151 cacheSize -= calcEstimatedSize(result); 152 if (resumer != null && cacheSize <= maxCacheSize / 2) { 153 resumePrefetch(); 154 } 155 } 156 return result; 157 } 158 159 @Override 160 public synchronized void close() { 161 closed = true; 162 queue.clear(); 163 cacheSize = 0; 164 if (resumer != null) { 165 resumePrefetch(); 166 } 167 notifyAll(); 168 } 169 170 @Override 171 public boolean renewLease() { 172 // we will do prefetching in the background and if there is no space we will just suspend the 173 // scanner. The renew lease operation will be handled in the background. 174 return false; 175 } 176 177 // used in tests to test whether the scanner has been suspended 178 synchronized boolean isSuspended() { 179 return resumer != null; 180 } 181 182 @Override 183 public ScanMetrics getScanMetrics() { 184 return scanMetrics; 185 } 186}